From a640d7c5dbb1f945b46c0cf7d913f494357aaa73 Mon Sep 17 00:00:00 2001 From: Rohit V Date: Wed, 3 Jun 2026 17:37:03 +0530 Subject: [PATCH 1/5] Implemented SpotterViz tools --- src/metrics/runtime/tool-metrics.ts | 30 ++ src/servers/conversation-storage-server.ts | 35 +++ src/servers/mcp-server-base.ts | 16 + src/servers/mcp-server.ts | 154 +++++++++ src/servers/tool-definitions.ts | 175 +++++++++++ src/spotterviz/spotterviz-client.ts | 127 ++++++++ src/spotterviz/spotterviz-service.ts | 346 +++++++++++++++++++++ src/spotterviz/spotterviz-sse-stream.ts | 196 ++++++++++++ src/spotterviz/types.ts | 84 +++++ src/storage-service/storage-service.ts | 94 +++++- src/thoughtspot/thoughtspot-client.ts | 158 +++++++++- src/thoughtspot/thoughtspot-service.ts | 127 +++++--- src/thoughtspot/types.ts | 9 + 13 files changed, 1499 insertions(+), 52 deletions(-) create mode 100644 src/spotterviz/spotterviz-client.ts create mode 100644 src/spotterviz/spotterviz-service.ts create mode 100644 src/spotterviz/spotterviz-sse-stream.ts create mode 100644 src/spotterviz/types.ts diff --git a/src/metrics/runtime/tool-metrics.ts b/src/metrics/runtime/tool-metrics.ts index c86b7ee..31e93c7 100644 --- a/src/metrics/runtime/tool-metrics.ts +++ b/src/metrics/runtime/tool-metrics.ts @@ -21,6 +21,10 @@ export const UPSTREAM_OPERATION_NAMES = { "send_agent_conversation_message_streaming", importMetadataTml: "import_metadata_tml", searchMetadata: "search_metadata", + createBachPinboardSession: "create_bach_pinboard_session", + saveBachPinboard: "save_bach_pinboard", + createAuroraSession: "create_aurora_session", + submitAuroraQuery: "submit_aurora_query", } as const; export type UpstreamOperation = @@ -130,6 +134,32 @@ export function recordUpstreamCallMetrics( recorder.histogram(METRIC_NAMES.upstreamDurationMs, durationMs, labels); } +/** + * Run an upstream call and record its outcome + duration via `recordUpstreamCallMetrics`. + * Errors propagate to the caller — the metric is recorded as `upstream_error` first. + */ +export async function observeUpstreamCall( + recorder: MetricsRecorder | undefined, + operation: UpstreamOperation, + call: () => Promise, +): Promise { + const startedAt = Date.now(); + let outcome: MetricOutcome = "success"; + try { + return await call(); + } catch (error) { + outcome = "upstream_error"; + throw error; + } finally { + recordUpstreamCallMetrics( + recorder, + operation, + outcome, + Date.now() - startedAt, + ); + } +} + export function recordUpstreamStreamStartedMetric( recorder: MetricsRecorder | undefined, operation: UpstreamOperation, diff --git a/src/servers/conversation-storage-server.ts b/src/servers/conversation-storage-server.ts index a4f8b4f..6616c2e 100644 --- a/src/servers/conversation-storage-server.ts +++ b/src/servers/conversation-storage-server.ts @@ -8,6 +8,9 @@ const MESSAGE_KEY_PREFIX = "message-"; const IS_DONE_KEY = "is-done"; const WRITE_BOOKMARK_KEY = "write-bookmark"; const READ_BOOKMARK_KEY = "read-bookmark"; +const METADATA_KEY = "metadata"; + +export type ConversationMetadata = Record; /** * A Durable Object that stores streaming conversation messages and exposes them over HTTP. @@ -20,6 +23,8 @@ const READ_BOOKMARK_KEY = "read-bookmark"; * POST /storage//initialize —> initializeConversation * POST /storage//append —> appendMessagesAndRestartTtl * GET /storage//messages —> getNewMessagesAndUpdateBookmark + * GET /storage//metadata —> getMetadata + * PATCH /storage//metadata —> mergeMetadataAndRestartTtl */ export class ConversationStorageServerSQLite { private conversationId = ""; @@ -56,6 +61,21 @@ export class ConversationStorageServerSQLite { return Response.json(state); } + case "GET /metadata": { + const metadata = + await this.state.storage.get(METADATA_KEY); + if (metadata === undefined) { + return Response.json({ error: "Not found" }, { status: 404 }); + } + return Response.json(metadata); + } + + case "PATCH /metadata": { + const patch = (await request.json()) as ConversationMetadata; + const merged = await this.mergeMetadataAndRestartTtl(patch); + return Response.json(merged); + } + default: return new Response("Not Found", { status: 404 }); } @@ -86,6 +106,21 @@ export class ConversationStorageServerSQLite { await this.restartTtl(); } + /* + * Shallow-merge `patch` into the existing metadata object. Top-level keys in `patch` + * overwrite existing keys. And restarts the TTL. + */ + private async mergeMetadataAndRestartTtl( + patch: ConversationMetadata, + ): Promise { + const existing = + (await this.state.storage.get(METADATA_KEY)) ?? {}; + const merged = { ...existing, ...patch }; + await this.state.storage.put(METADATA_KEY, merged); + await this.restartTtl(); + return merged; + } + /* * Append new messages to the conversation, starting at the current state of WRITE_BOOKMARK and * saving the new state of WRITE_BOOKMARK after. Writes are done un bulk, but batched if there diff --git a/src/servers/mcp-server-base.ts b/src/servers/mcp-server-base.ts index 4df14f0..ecde6fe 100644 --- a/src/servers/mcp-server-base.ts +++ b/src/servers/mcp-server-base.ts @@ -27,6 +27,8 @@ import { recordToolInvocationMetrics, } from "../metrics/runtime/tool-metrics"; import { getActiveSpan, withSpan } from "../metrics/tracing/tracing-utils"; +import { SpotterVizClient } from "../spotterviz/spotterviz-client"; +import { SpotterVizService } from "../spotterviz/spotterviz-service"; import { StorageServiceClient } from "../storage-service/storage-service"; import { getThoughtSpotClient } from "../thoughtspot/thoughtspot-client"; import { ThoughtSpotService } from "../thoughtspot/thoughtspot-service"; @@ -228,6 +230,20 @@ export abstract class BaseMCPServer extends Server { ); } + protected async getSpotterVizService( + recorder?: MetricsRecorder, + ): Promise { + return new SpotterVizService( + this.getThoughtSpotService(recorder), + await this.getStorageService(), + new SpotterVizClient( + this.ctx.props.instanceUrl, + this.ctx.props.accessToken, + ), + recorder, + ); + } + protected abstract getToolMetricApiSurface(): ToolMetricApiSurface; protected getToolMetricApiVersionLabel(): string | undefined { diff --git a/src/servers/mcp-server.ts b/src/servers/mcp-server.ts index b4b4886..d600cca 100644 --- a/src/servers/mcp-server.ts +++ b/src/servers/mcp-server.ts @@ -25,6 +25,10 @@ import { GetRelevantQuestionsSchema, GetSessionUpdatesInputSchema, SendSessionMessageInputSchema, + SpotterVizCreateSessionInputSchema, + SpotterVizGetUpdatesInputSchema, + SpotterVizSaveLiveboardInputSchema, + SpotterVizSubmitQueryInputSchema, ToolName, } from "./tool-definitions"; import { @@ -241,6 +245,22 @@ export class MCPServer extends BaseMCPServer { return this.callCreateDashboard(request, recorder); } + case ToolName.SpotterVizCreateSession: { + return this.callSpotterVizCreateSession(request, recorder); + } + + case ToolName.SpotterVizSubmitQuery: { + return this.callSpotterVizSubmitQuery(request, recorder); + } + + case ToolName.SpotterVizGetUpdates: { + return this.callSpotterVizGetUpdates(request, recorder); + } + + case ToolName.SpotterVizSaveLiveboard: { + return this.callSpotterVizSaveLiveboard(request, recorder); + } + default: throw new Error(`Unknown tool: ${name}`); } @@ -605,4 +625,138 @@ Provide this url to the user as a link to view the liveboard in ThoughtSpot.`; }; return this._sources; } + + @WithSpan("call-spotterviz-save-liveboard") + async callSpotterVizSaveLiveboard( + request: z.infer, + recorder: MetricsRecorder, + ) { + const { spotterviz_session_id } = SpotterVizSaveLiveboardInputSchema.parse( + request.params.arguments, + ); + + try { + const service = await this.getSpotterVizService(recorder); + const { liveboardId, liveboardUrl } = await service.saveLiveboard({ + spotterVizSessionId: spotterviz_session_id, + }); + + return this.createStructuredContentSuccessResponse( + { liveboard_id: liveboardId, liveboard_url: liveboardUrl }, + "SpotterViz liveboard saved successfully", + ); + } catch (error) { + console.error("Error saving SpotterViz liveboard:", error); + return this.createErrorResponse( + `Failed to save SpotterViz liveboard: ${(error as Error).message}`, + "SpotterViz save liveboard failed", + ); + } + } + + @WithSpan("call-spotterviz-get-updates") + async callSpotterVizGetUpdates( + request: z.infer, + recorder: MetricsRecorder, + ) { + const { spotterviz_session_id } = SpotterVizGetUpdatesInputSchema.parse( + request.params.arguments, + ); + + try { + const service = await this.getSpotterVizService(recorder); + const { updates, isDone } = await service.getUpdates({ + spotterVizSessionId: spotterviz_session_id, + }); + + return this.createStructuredContentSuccessResponse( + { updates, is_done: isDone }, + "SpotterViz session updates retrieved successfully", + ); + } catch (error) { + console.error("Error getting SpotterViz updates:", error); + return this.createErrorResponse( + `Failed to get SpotterViz updates: ${(error as Error).message}`, + "SpotterViz get updates failed", + ); + } + } + + @WithSpan("call-spotterviz-submit-query") + async callSpotterVizSubmitQuery( + request: z.infer, + recorder: MetricsRecorder, + ) { + const { spotterviz_session_id, message } = + SpotterVizSubmitQueryInputSchema.parse(request.params.arguments); + + const storageService = await this.getStorageService(); + try { + await storageService.initializeConversation(spotterviz_session_id); + } catch (error) { + console.error( + "Error initializing SpotterViz conversation in storage service:", + error, + ); + return this.createErrorResponse( + "The SpotterViz session has an ongoing response to the previous message. Please continue to call `spotterviz_get_updates` until `is_done` is true before sending a followup message.", + `Error submitting SpotterViz query for session ${spotterviz_session_id}: ${error}`, + ); + } + + try { + const service = await this.getSpotterVizService(recorder); + const { streamPromise } = await service.submitQuery({ + spotterVizSessionId: spotterviz_session_id, + message, + }); + + // Hand the stream-drain off to the Worker runtime so we can return immediately. + // Falls through harmlessly in tests / non-Worker runtimes where waitUntil is absent. + this.ctx.ctx?.waitUntil?.(streamPromise); + + return this.createStructuredContentSuccessResponse( + { success: true }, + "SpotterViz query submitted successfully", + ); + } catch (error) { + console.error("Error submitting SpotterViz query:", error); + return this.createErrorResponse( + `Failed to submit SpotterViz query: ${(error as Error).message}`, + "SpotterViz submit query failed", + ); + } + } + + @WithSpan("call-spotterviz-create-session") + async callSpotterVizCreateSession( + request: z.infer, + recorder: MetricsRecorder, + ) { + const { new_liveboard_name, existing_liveboard_id } = + SpotterVizCreateSessionInputSchema.parse(request.params.arguments); + + try { + const service = await this.getSpotterVizService(recorder); + const result = await service.createSession({ + newLiveboardName: new_liveboard_name, + existingLiveboardId: existing_liveboard_id, + }); + + return this.createStructuredContentSuccessResponse( + { + spotterviz_session_id: result.spotterVizSessionId, + liveboard_id: result.liveboardId, + liveboard_name: result.liveboardName, + }, + "SpotterViz session created successfully", + ); + } catch (error) { + console.error("Error creating SpotterViz session:", error); + return this.createErrorResponse( + `Failed to create SpotterViz session: ${(error as Error).message}`, + "SpotterViz session create failed", + ); + } + } } diff --git a/src/servers/tool-definitions.ts b/src/servers/tool-definitions.ts index 8db2e91..d1f730b 100644 --- a/src/servers/tool-definitions.ts +++ b/src/servers/tool-definitions.ts @@ -257,6 +257,124 @@ export const CreateDashboardOutputSchema = z.object({ ), }); +export const SpotterVizCreateSessionInputSchema = z + .object({ + new_liveboard_name: z + .string() + .optional() + .describe( + "Name for a new, empty liveboard to be created. Provide this when the user wants to start a SpotterViz session from scratch.", + ), + existing_liveboard_id: z + .uuid() + .optional() + .describe( + "GUID of an existing liveboard to open in SpotterViz (the format is xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx). Provide this when the user wants to continue with an existing liveboard instead of creating a new one.", + ), + }) + .refine( + (d) => + (d.new_liveboard_name === undefined) !== + (d.existing_liveboard_id === undefined), + { + message: + "Exactly one of `new_liveboard_name` or `existing_liveboard_id` must be provided.", + }, + ); + +export const SpotterVizCreateSessionOutputSchema = z.object({ + spotterviz_session_id: z + .string() + .describe( + "Identifier for the SpotterViz session. Use this with future spotterviz_send_message / spotterviz_get_updates calls.", + ), + liveboard_id: z + .string() + .describe("GUID of the liveboard the session is bound to."), + liveboard_name: z + .string() + .optional() + .describe("Display name of the liveboard, if known."), +}); + +export const SpotterVizSubmitQueryInputSchema = z.object({ + spotterviz_session_id: z + .string() + .describe( + "Identifier of the SpotterViz session to send the message to. Use the value returned from `spotterviz_create_session`.", + ), + message: z + .string() + .min(1) + .max(2000) + .describe( + "Natural-language instruction or question to send to the SpotterViz agent. Maximum 2000 characters.", + ), +}); + +export const SpotterVizSubmitQueryOutputSchema = z.object({ + success: z + .boolean() + .describe( + "Whether the message was accepted and streaming started. After this returns, poll `spotterviz_get_updates` for the agent's response.", + ), +}); + +export const SpotterVizGetUpdatesInputSchema = z.object({ + spotterviz_session_id: z + .string() + .describe( + "Identifier of the SpotterViz session to fetch updates from. Use the value returned from `spotterviz_create_session`.", + ), +}); + +export const SpotterVizUpdateSchema = z.object({ + event_type: z + .string() + .describe( + "The Aurora SSE event type (e.g. `message.delta`, `control.action`, `meta.error`).", + ), + data: z + .record(z.string(), z.unknown()) + .describe("Raw event payload as emitted by Aurora."), + message_id: z.string().optional(), + idx: z.number().optional(), + timestamp: z.string().optional(), + tool_id: z.string().optional(), + group_id: z.string().optional(), + heading: z.string().optional(), +}); + +export const SpotterVizGetUpdatesOutputSchema = z.object({ + updates: z + .array(SpotterVizUpdateSchema) + .describe( + "Incremental SSE events emitted by the SpotterViz agent since the last call. Empty when the agent is still thinking and no new events have arrived within the wait window.", + ), + is_done: z + .boolean() + .describe( + "Whether the SpotterViz agent has finished responding for this turn. If false, call this tool again to continue polling.", + ), +}); + +export const SpotterVizSaveLiveboardInputSchema = z.object({ + spotterviz_session_id: z + .string() + .describe( + "Identifier of the SpotterViz session whose current liveboard state should be saved. Use the value returned from `spotterviz_create_session`.", + ), +}); + +export const SpotterVizSaveLiveboardOutputSchema = z.object({ + liveboard_id: z.string().describe("GUID of the saved liveboard."), + liveboard_url: z + .string() + .describe( + "URL where the user can view the saved liveboard in ThoughtSpot. Provide this link to the user when reporting that the liveboard was saved.", + ), +}); + export enum ToolName { // V1 Ping = "ping", @@ -270,6 +388,11 @@ export enum ToolName { SendSessionMessage = "send_session_message", GetSessionUpdates = "get_session_updates", CreateDashboard = "create_dashboard", + // SpotterViz (Aurora) + SpotterVizCreateSession = "spotterviz_create_session", + SpotterVizSubmitQuery = "spotterviz_submit_query", + SpotterVizGetUpdates = "spotterviz_get_updates", + SpotterVizSaveLiveboard = "spotterviz_save_liveboard", } export const toolDefinitionsV1 = [ @@ -398,4 +521,56 @@ export const toolDefinitionsV2 = [ openWorldHint: false, }, }, + { + name: ToolName.SpotterVizCreateSession, + description: + "Start a SpotterViz (liveboard agent) session against either a brand-new empty liveboard or an existing one. Exactly one of `new_liveboard_name` or `existing_liveboard_id` must be provided. When `new_liveboard_name` is set, an empty liveboard is created first. The returned `spotterviz_session_id` is the identifier for follow-up SpotterViz tools that send messages and stream updates.", + inputSchema: z.toJSONSchema(SpotterVizCreateSessionInputSchema), + outputSchema: z.toJSONSchema(SpotterVizCreateSessionOutputSchema), + annotations: { + title: "Create SpotterViz Session", + readOnlyHint: false, + destructiveHint: false, + openWorldHint: false, + }, + }, + { + name: ToolName.SpotterVizSubmitQuery, + description: + "Submit a natural-language message to an existing SpotterViz session. The SpotterViz agent streams its response asynchronously, so this tool returns immediately once streaming starts. Poll `spotterviz_get_updates` to retrieve the response. Do not call this again on the same `spotterviz_session_id` until the previous turn has finished (i.e. `spotterviz_get_updates` returns the turn-done signal); calling it concurrently will be rejected.", + inputSchema: z.toJSONSchema(SpotterVizSubmitQueryInputSchema), + outputSchema: z.toJSONSchema(SpotterVizSubmitQueryOutputSchema), + annotations: { + title: "Submit SpotterViz Query", + readOnlyHint: false, + destructiveHint: false, + openWorldHint: false, + }, + }, + { + name: ToolName.SpotterVizGetUpdates, + description: + "Get the latest streaming events from a SpotterViz session. Call this after `spotterviz_submit_query` and continue polling until `is_done` is true. The tool waits adaptively for new events (with internal exponential backoff up to 16 s) and returns early as soon as any events arrive or the turn finishes, so back-to-back calls cost no more than a quick poll when activity is high. An empty `updates` list with `is_done: false` simply means the agent is still thinking — call again to keep polling.", + inputSchema: z.toJSONSchema(SpotterVizGetUpdatesInputSchema), + outputSchema: z.toJSONSchema(SpotterVizGetUpdatesOutputSchema), + annotations: { + title: "Get SpotterViz Session Updates", + readOnlyHint: true, + destructiveHint: false, + openWorldHint: false, + }, + }, + { + name: ToolName.SpotterVizSaveLiveboard, + description: + "Persist the current state of the SpotterViz session's liveboard back to ThoughtSpot. Call this when the user has reached a result they want to keep — for example after a series of `spotterviz_submit_query` turns. The session stays active after saving, so further `spotterviz_submit_query` calls on the same session id continue to work. Returns a `liveboard_url` you can surface to the user as a link to the saved liveboard.", + inputSchema: z.toJSONSchema(SpotterVizSaveLiveboardInputSchema), + outputSchema: z.toJSONSchema(SpotterVizSaveLiveboardOutputSchema), + annotations: { + title: "Save SpotterViz Liveboard", + readOnlyHint: false, + destructiveHint: false, + openWorldHint: false, + }, + }, ]; diff --git a/src/spotterviz/spotterviz-client.ts b/src/spotterviz/spotterviz-client.ts new file mode 100644 index 0000000..d79f7b6 --- /dev/null +++ b/src/spotterviz/spotterviz-client.ts @@ -0,0 +1,127 @@ +/** + * SpotterViz client. + * + * Aurora is reached through the same ThoughtSpot instance — nginx upstreams `/aurora/*` to the + * Aurora MT host, rewrites the path (strips `/aurora`), and injects `X-Tenant-Host: $http_host` + * itself — so we just call `/aurora/` with the user's bearer token. + */ +import type { BachSession } from "../thoughtspot/types"; +import type { AuroraSessionContext, AuroraSessionInitResult } from "./types"; + +export class SpotterVizClient { + constructor( + readonly instanceUrl: string, + private readonly bearerToken: string, + private readonly locale: string = "en-US", + ) {} + + /** + * Build the headers for an Aurora request. The User-Agent override is mandatory — the AWS + * WAF in front of the Aurora path blocks the default `Cloudflare-Workers` UA that Workers' + * fetch injects. Pass `token` for endpoints that require the Aurora session JWT (everything + * except `/aurora/init`). + */ + private buildHeaders( + acceptMimeType: string, + token?: string, + ): Record { + const headers: Record = { + "Content-Type": "application/json", + "user-agent": "ThoughtSpot-ts-client", + Accept: acceptMimeType, + Authorization: `Bearer ${this.bearerToken}`, + }; + if (token) { + headers["X-Aurora-Session-Token"] = token; + } + return headers; + } + + /** + * Open an Aurora session bound to an active BACH pinboard generation. Returns the Aurora + * session id and a short-lived JWT that subsequent `submitAuroraQuery` calls must echo back + * via the `X-Aurora-Session-Token` header. + */ + async createAuroraSession( + bachSession: BachSession, + ): Promise { + const response = await fetch(`${this.instanceUrl}/aurora/init`, { + method: "POST", + headers: this.buildHeaders("application/json"), + body: JSON.stringify({ + session_id: { + transaction_id: bachSession.transactionId, + generation_number: bachSession.generationNumber, + }, + locale: this.locale, + starter_prompts: [], + tenant_flags: { + enable_charting_skill: true, + }, + }), + }); + + if (!response.ok) { + const errorText = await response.text(); + throw new Error( + `createAuroraSession failed (${response.status}): ${errorText}`, + ); + } + + const data = (await response.json()) as { + success?: boolean; + message?: string; + errors?: string[]; + jwt_token?: string; + session_id?: string; + liveboard_name?: string; + }; + + if (!data.success || !data.session_id || !data.jwt_token) { + const errMsg = + data.errors?.join("; ") || + data.message || + "Aurora /init returned an unsuccessful response"; + throw new Error(`createAuroraSession failed: ${errMsg}`); + } + + return { + auroraSessionId: data.session_id, + jwtToken: data.jwt_token, + liveboardName: data.liveboard_name, + }; + } + + /** + * Submit a user message to an existing Aurora session and obtain an SSE stream of agent events. + * The returned Response carries an SSE body; the caller must drain it via `body.getReader()` + */ + async submitAuroraQuery( + auroraCtx: AuroraSessionContext, + message: string, + ): Promise { + const response = await fetch(`${this.instanceUrl}/aurora/chat/stream`, { + method: "POST", + headers: this.buildHeaders("text/event-stream", auroraCtx.auroraJwtToken), + body: JSON.stringify({ + message, + session_id: { + transaction_id: auroraCtx.transactionId, + generation_number: auroraCtx.generationNumber, + }, + liveboard_id: auroraCtx.liveboardId, + stream: true, + locale: this.locale, + }), + }); + + if (!response.ok) { + const errorText = await response.text(); + throw new Error( + `submitAuroraQuery failed (${response.status}): ${errorText}`, + ); + } + + return response; + } +} diff --git a/src/spotterviz/spotterviz-service.ts b/src/spotterviz/spotterviz-service.ts new file mode 100644 index 0000000..ca6376d --- /dev/null +++ b/src/spotterviz/spotterviz-service.ts @@ -0,0 +1,346 @@ +import { SpanStatusCode } from "@opentelemetry/api"; +import type { MetricsRecorder } from "../metrics/runtime/metrics-recorder"; +import { + UPSTREAM_OPERATION_NAMES, + observeUpstreamCall, +} from "../metrics/runtime/tool-metrics"; +import { WithSpan, getActiveSpan } from "../metrics/tracing/tracing-utils"; +import type { StorageServiceClient } from "../storage-service/storage-service"; +import type { ThoughtSpotService } from "../thoughtspot/thoughtspot-service"; +import type { SpotterVizClient } from "./spotterviz-client"; +import { processAuroraSseStream } from "./spotterviz-sse-stream"; +import type { + AuroraSessionContext, + AuroraSessionInitResult, + CreateSpotterVizSessionParams, + CreateSpotterVizSessionResult, + GetSpotterVizUpdatesParams, + GetSpotterVizUpdatesResult, + SaveSpotterVizLiveboardParams, + SaveSpotterVizLiveboardResult, + SpotterVizEvent, + SubmitSpotterVizQueryParams, +} from "./types"; + +// Adaptive poll cadence for streaming SSE updates: 2 s → 4 s → 8 s → 16 s (held). +const GET_UPDATES_WAIT_SEQUENCE_MS = [2_000, 4_000, 8_000, 16_000]; + +/** + * Orchestrates a SpotterViz (Aurora) session: creation, per-turn query streaming, adaptive + * SSE polling, and committing the liveboard. Delegates BACH/liveboard ops to ThoughtSpotService + * and Aurora HTTP to SpotterVizClient; uses StorageServiceClient for the event log and to + * persist the Aurora session context. + */ +export class SpotterVizService { + constructor( + private readonly tsService: ThoughtSpotService, + private readonly storage: StorageServiceClient, + private readonly client: SpotterVizClient, + private readonly recorder?: MetricsRecorder, + ) {} + + /** + * Open a SpotterViz session: provision a new empty liveboard or attach to an existing one, + * start a BACH pinboard session, initialize Aurora, and persist the resulting context. + */ + @WithSpan("spotterviz-create-session") + async createSession( + params: CreateSpotterVizSessionParams, + ): Promise { + const span = getActiveSpan(); + const { newLiveboardName, existingLiveboardId } = params; + + try { + const liveboardId = await this.resolveLiveboardId( + newLiveboardName, + existingLiveboardId, + ); + span?.setAttribute("liveboard_id", liveboardId); + + const bachSession = + await this.tsService.createBachPinboardSession(liveboardId); + span?.setAttributes({ + bach_transaction_id: bachSession.transactionId, + bach_generation_number: bachSession.generationNumber, + }); + + const aurora = await observeUpstreamCall( + this.recorder, + UPSTREAM_OPERATION_NAMES.createAuroraSession, + () => this.client.createAuroraSession(bachSession), + ); + span?.setAttribute("aurora_session_id", aurora.auroraSessionId); + + await this.persistAuroraContext( + aurora, + bachSession, + liveboardId, + newLiveboardName, + ); + + span?.setStatus({ + code: SpanStatusCode.OK, + message: "SpotterViz session created", + }); + return { + spotterVizSessionId: aurora.auroraSessionId, + liveboardId, + liveboardName: aurora.liveboardName ?? newLiveboardName, + }; + } catch (error) { + span?.setStatus({ + code: SpanStatusCode.ERROR, + message: `Error creating SpotterViz session: ${(error as Error).message}`, + }); + throw error; + } + } + + /** + * Begin a new turn: open the SSE stream and return a `streamPromise` for the caller to plumb + * through `ctx.waitUntil`. The promise drains events into the DO and marks done at stream + * close. + */ + @WithSpan("spotterviz-submit-query") + async submitQuery( + params: SubmitSpotterVizQueryParams, + ): Promise<{ streamPromise: Promise }> { + const span = getActiveSpan(); + const { spotterVizSessionId, message } = params; + span?.setAttribute("spotterviz_session_id", spotterVizSessionId); + span?.setAttribute("message_length", message.length); + + try { + const context = + await this.storage.getMetadata( + spotterVizSessionId, + ); + + // Defensive: the previous turn may have ended without anyone calling get_updates, which + // would leave pollCount stale and make the first poll of this turn wait too long. + await this.storage.updateMetadata( + spotterVizSessionId, + { pollCount: 0 }, + ); + + const response = await observeUpstreamCall( + this.recorder, + UPSTREAM_OPERATION_NAMES.submitAuroraQuery, + () => this.client.submitAuroraQuery(context, message), + ); + + const reader = response.body?.getReader(); + if (!reader) { + throw new Error("Aurora /chat/stream returned no response body"); + } + + const streamPromise = processAuroraSseStream( + spotterVizSessionId, + reader, + this.storage, + ); + + span?.setStatus({ + code: SpanStatusCode.OK, + message: "SpotterViz turn started", + }); + return { streamPromise }; + } catch (error) { + // The handler primed the conversation before calling submitQuery; if we fail anywhere + // after that, mark it done so get_updates doesn't poll a stream that will never arrive. + await this.markSessionDoneBestEffort(spotterVizSessionId); + span?.setStatus({ + code: SpanStatusCode.ERROR, + message: `Error submitting SpotterViz query: ${(error as Error).message}`, + }); + throw error; + } + } + + /** + * Persist the SpotterViz session's current BACH generation to the saved liveboard. The session + * remains live afterwards so the user can keep iterating; only the liveboard is committed. + */ + @WithSpan("spotterviz-save-liveboard") + async saveLiveboard( + params: SaveSpotterVizLiveboardParams, + ): Promise { + const span = getActiveSpan(); + const { spotterVizSessionId } = params; + span?.setAttribute("spotterviz_session_id", spotterVizSessionId); + + try { + const context = + await this.storage.getMetadata( + spotterVizSessionId, + ); + if (!context.liveboardId) { + throw new Error( + "SpotterViz session metadata is missing a liveboardId; cannot save.", + ); + } + span?.setAttributes({ + liveboard_id: context.liveboardId, + bach_transaction_id: context.transactionId, + bach_generation_number: context.generationNumber, + }); + + await this.tsService.saveBachPinboard( + context.transactionId, + context.generationNumber, + ); + + const liveboardUrl = `${this.client.instanceUrl}/#/pinboard/${context.liveboardId}`; + span?.setStatus({ + code: SpanStatusCode.OK, + message: "SpotterViz liveboard saved", + }); + return { liveboardId: context.liveboardId, liveboardUrl }; + } catch (error) { + span?.setStatus({ + code: SpanStatusCode.ERROR, + message: `Error saving SpotterViz liveboard: ${(error as Error).message}`, + }); + throw error; + } + } + + /** + * Drain SSE events that have arrived since the caller's last poll. Fast-returns if the turn + * is already done; otherwise waits one step of `GET_UPDATES_WAIT_SEQUENCE_MS[pollCount]`, + * advances `pollCount`, and polls once more. `pollCount` resets when a turn finishes. + */ + @WithSpan("spotterviz-get-updates") + async getUpdates( + params: GetSpotterVizUpdatesParams, + ): Promise { + const span = getActiveSpan(); + const { spotterVizSessionId } = params; + span?.setAttribute("spotterviz_session_id", spotterVizSessionId); + + // Initial peek — if the turn is already done, fast-return without waiting. + const initial = + await this.storage.getNewEvents(spotterVizSessionId); + const collected: SpotterVizEvent[] = [...initial.messages]; + + if (initial.isDone) { + await this.resetPollCount(spotterVizSessionId); + span?.setAttributes({ + wait_time_ms: 0, + poll_count_used: 0, + total_session_updates: collected.length, + is_done: true, + }); + span?.setStatus({ + code: SpanStatusCode.OK, + message: "SpotterViz turn already done at peek", + }); + return { updates: collected, isDone: true }; + } + + // Load pollCount, sleep one step, advance pollCount, then poll once more. + const pollCount = await this.readPollCount(spotterVizSessionId); + const waitMs = + GET_UPDATES_WAIT_SEQUENCE_MS[ + Math.min(pollCount, GET_UPDATES_WAIT_SEQUENCE_MS.length - 1) + ]; + await new Promise((resolve) => setTimeout(resolve, waitMs)); + await this.storage.updateMetadata( + spotterVizSessionId, + { pollCount: pollCount + 1 }, + ); + + const followup = + await this.storage.getNewEvents(spotterVizSessionId); + collected.push(...followup.messages); + + if (followup.isDone) { + await this.resetPollCount(spotterVizSessionId); + } + + span?.setAttributes({ + wait_time_ms: waitMs, + poll_count_used: pollCount, + total_session_updates: collected.length, + is_done: followup.isDone, + }); + span?.setStatus({ + code: SpanStatusCode.OK, + message: "SpotterViz updates polled", + }); + + return { updates: collected, isDone: followup.isDone }; + } + + private async resolveLiveboardId( + newLiveboardName: string | undefined, + existingLiveboardId: string | undefined, + ): Promise { + if (newLiveboardName) { + const created = + await this.tsService.createEmptyLiveboard(newLiveboardName); + return created.liveboardId; + } + if (existingLiveboardId) { + return existingLiveboardId; + } + throw new Error( + "Could not resolve a liveboard id for the SpotterViz session.", + ); + } + + private async persistAuroraContext( + aurora: AuroraSessionInitResult, + bachSession: { transactionId: string; generationNumber: string }, + liveboardId: string, + newLiveboardName: string | undefined, + ): Promise { + const auroraContext: AuroraSessionContext = { + auroraSessionId: aurora.auroraSessionId, + auroraJwtToken: aurora.jwtToken, + transactionId: bachSession.transactionId, + generationNumber: bachSession.generationNumber, + liveboardId, + liveboardName: aurora.liveboardName ?? newLiveboardName, + }; + await this.storage.updateMetadata( + aurora.auroraSessionId, + auroraContext, + ); + } + + private async markSessionDoneBestEffort( + spotterVizSessionId: string, + ): Promise { + try { + await this.storage.appendEvents(spotterVizSessionId, [], true); + } catch (err) { + console.error( + `Failed to mark SpotterViz session ${spotterVizSessionId} done:`, + err, + ); + } + } + + private async readPollCount(spotterVizSessionId: string): Promise { + const metadata = + await this.storage.getMetadata(spotterVizSessionId); + return typeof metadata.pollCount === "number" ? metadata.pollCount : 0; + } + + private async resetPollCount(spotterVizSessionId: string): Promise { + try { + await this.storage.updateMetadata( + spotterVizSessionId, + { pollCount: 0 }, + ); + } catch (err) { + console.warn( + `Failed to reset pollCount for ${spotterVizSessionId}:`, + err, + ); + // Delibrately swallow the error as the events have already been drained. + // If this throws, the drained errors would be lost. + } + } +} diff --git a/src/spotterviz/spotterviz-sse-stream.ts b/src/spotterviz/spotterviz-sse-stream.ts new file mode 100644 index 0000000..b3a70b8 --- /dev/null +++ b/src/spotterviz/spotterviz-sse-stream.ts @@ -0,0 +1,196 @@ +import { type Span, SpanStatusCode } from "@opentelemetry/api"; +import { withSpan } from "../metrics/tracing/tracing-utils"; +import type { StorageServiceClient } from "../storage-service/storage-service"; +import type { AuroraSessionContext, SpotterVizEvent } from "./types"; + +/** + * Drain an Aurora `/aurora/chat/stream` SSE response into the conversation storage DO. + * + * Wire format: each SSE frame is `event: \ndata: \n\n`. We keep the event_type alongside + * the parsed data so the get_updates tool can project into the user-facing shape without re-parsing. + * + * Lifecycle: mirroring the spotter streaming pattern, the conversation is marked done only when + * the HTTP stream itself closes — never on in-band terminal events like `message.end`. Trailing + * frames from Aurora's SSE keepalive layer are simply appended like any other event. + * + * Side effect: when a `control.action` of `action: "lb_refresh"` carries a `new_gen_number`, the + * stored AuroraSessionContext.generationNumber is patched so the next turn addresses the right + * BACH state. + */ +export const processAuroraSseStream = async ( + conversationId: string, + reader: ReadableStreamDefaultReader, + storage: StorageServiceClient, +): Promise => { + return await withSpan("process-spotterviz-sse-stream", async (span: Span) => { + span.setAttribute("conversation_id", conversationId); + + let nEvents = 0; + let nGenNumberUpdates = 0; + let spanHasError = false; + + try { + const decoder = new TextDecoder(); + let buffer = ""; + + while (true) { + const { done, value } = await reader.read(); + + if (done) { + await storage.appendEvents(conversationId, [], true); + break; + } + + buffer += decoder.decode(value, { stream: true }); + + // SSE frames are separated by a blank line. Anything after the last `\n\n` is a + // partial frame; keep it in the buffer for the next loop. + const lastBoundary = buffer.lastIndexOf("\n\n"); + if (lastBoundary === -1) { + continue; + } + const completeFrames = buffer.slice(0, lastBoundary); + buffer = buffer.slice(lastBoundary + 2); + + const newEvents: SpotterVizEvent[] = []; + let genNumberPatch: string | undefined; + + for (const rawFrame of completeFrames.split("\n\n")) { + const frame = rawFrame.trim(); + if (!frame || frame.startsWith(":")) { + // Blank or SSE comment (e.g. heartbeats). + continue; + } + + const parsed = parseSseFrame(frame); + if (!parsed) { + continue; + } + + newEvents.push(parsed); + nEvents++; + + if (parsed.event_type === "meta.error") { + spanHasError = true; + const msg = + (parsed.data?.message as string | undefined) ?? "unknown error"; + span.setStatus({ + code: SpanStatusCode.ERROR, + message: `Aurora meta.error: ${msg}`, + }); + } else if (parsed.event_type === "control.action") { + const action = parsed.data?.action; + const metadata = parsed.data?.metadata as + | Record + | undefined; + if (action === "lb_refresh" && metadata?.new_gen_number != null) { + // Aurora emits new_gen_number as an int; we keep it as a string to match + // the rest of the BACH session shape in AuroraSessionContext. + genNumberPatch = String(metadata.new_gen_number); + } + } + } + + if (genNumberPatch !== undefined) { + try { + await storage.updateMetadata(conversationId, { + generationNumber: genNumberPatch, + }); + nGenNumberUpdates++; + } catch (err) { + console.error( + `Failed to patch generationNumber for ${conversationId}:`, + err, + ); + } + } + + if (newEvents.length > 0) { + await storage.appendEvents(conversationId, newEvents); + } + } + } catch (error) { + console.error( + `Error processing Aurora SSE stream for ${conversationId}:`, + error, + ); + spanHasError = true; + span.setStatus({ + code: SpanStatusCode.ERROR, + message: error instanceof Error ? error.message : String(error), + }); + // Best-effort: mark the conversation done so get_updates doesn't hang on a turn that + // will never produce more events. + try { + await storage.appendEvents(conversationId, [], true); + } catch (markErr) { + console.error( + `Failed to mark conversation ${conversationId} done after stream error:`, + markErr, + ); + } + } + + span.setAttributes({ + total_events_parsed: nEvents, + gen_number_updates: nGenNumberUpdates, + }); + if (!spanHasError) { + span.setStatus({ + code: SpanStatusCode.OK, + message: "Aurora SSE stream concluded", + }); + } + }); +}; + +/** + * Parse a single SSE frame ("event: \ndata: \n[...]") into a `SpotterVizEvent`. + * Returns `null` for frames we can't interpret (missing data field, malformed JSON, etc.) — + * those are logged but don't abort the stream, since a single bad frame shouldn't kill the turn. + */ +function parseSseFrame(frame: string): SpotterVizEvent | null { + let eventType: string | undefined; + const dataLines: string[] = []; + + for (const line of frame.split("\n")) { + if (line.startsWith("event:")) { + eventType = line.slice(6).trim(); + } else if (line.startsWith("data:")) { + dataLines.push(line.slice(5).trim()); + } + // Other SSE fields (id:, retry:) are not used by Aurora. + } + + if (dataLines.length === 0) { + return null; + } + + const dataJson = dataLines.join("\n"); + let parsed: Record; + try { + parsed = JSON.parse(dataJson); + } catch (err) { + console.warn( + `Aurora SSE frame had unparseable data (event=${eventType ?? "unknown"}):`, + err, + ); + return null; + } + + // Aurora's BaseEvent already includes `event_type`; if the SSE `event:` field is missing, + // fall back to whatever the data payload says. + const inferredType = + eventType ?? (parsed.event_type as string | undefined) ?? "unknown"; + + return { + event_type: inferredType, + data: (parsed.data as Record) ?? {}, + message_id: parsed.message_id as string | undefined, + idx: parsed.idx as number | undefined, + timestamp: parsed.timestamp as string | undefined, + tool_id: parsed.tool_id as string | undefined, + group_id: parsed.group_id as string | undefined, + heading: parsed.heading as string | undefined, + }; +} diff --git a/src/spotterviz/types.ts b/src/spotterviz/types.ts new file mode 100644 index 0000000..9538f60 --- /dev/null +++ b/src/spotterviz/types.ts @@ -0,0 +1,84 @@ +/** + * Types for the SpotterViz (Aurora liveboard agent) integration. + * + * `BachSession` lives in `src/thoughtspot/types.ts` because BACH is a general ThoughtSpot + * concept; the types below are SpotterViz-specific. + */ + +export interface AuroraSessionInitResult { + auroraSessionId: string; + jwtToken: string; + liveboardName?: string; +} + +/** + * Aurora session context stored in the shared ConversationStorageServer DO under the metadata + * slot. Follow-up SpotterViz tools read this to address the right Aurora session. The instance + * URL is not stored — it's taken fresh from ctx.props on each request. + */ +export interface AuroraSessionContext extends Record { + auroraSessionId: string; + auroraJwtToken: string; + transactionId: string; + generationNumber: string; + liveboardId?: string; + liveboardName?: string; + /** + * Number of `get_updates` calls already issued for the current turn. Drives the per-call wait + * backoff. Reset to 0 whenever a turn is observed to be done and whenever a new turn starts via + * `submitQuery`. Absent means "not yet polled" (effectively 0). + */ + pollCount?: number; +} + +export interface CreateSpotterVizSessionParams { + newLiveboardName?: string; + existingLiveboardId?: string; +} + +export interface CreateSpotterVizSessionResult { + spotterVizSessionId: string; + liveboardId: string; + liveboardName?: string; +} + +/** + * Aurora SSE event format. + */ +export interface SpotterVizEvent extends Record { + event_type: string; + data: Record; + message_id?: string; + idx?: number; + timestamp?: string; + tool_id?: string; + group_id?: string; + heading?: string; +} + +export interface SubmitSpotterVizQueryParams { + spotterVizSessionId: string; + message: string; +} + +export interface SubmitSpotterVizQueryResult { + success: true; +} + +export interface GetSpotterVizUpdatesParams { + spotterVizSessionId: string; +} + +export interface GetSpotterVizUpdatesResult { + updates: SpotterVizEvent[]; + isDone: boolean; +} + +export interface SaveSpotterVizLiveboardParams { + spotterVizSessionId: string; +} + +export interface SaveSpotterVizLiveboardResult { + liveboardId: string; + liveboardUrl: string; +} diff --git a/src/storage-service/storage-service.ts b/src/storage-service/storage-service.ts index a57660c..dda592b 100644 --- a/src/storage-service/storage-service.ts +++ b/src/storage-service/storage-service.ts @@ -1,3 +1,4 @@ +import type { ConversationMetadata } from "../servers/conversation-storage-server"; import type { Message, StreamingMessagesState } from "../thoughtspot/types"; /** @@ -8,6 +9,8 @@ import type { Message, StreamingMessagesState } from "../thoughtspot/types"; * POST /storage//initialize —> initializeConversation * POST /storage//append —> appendMessagesAndRestartTtl * GET /storage//messages —> getNewMessagesAndUpdateBookmark + * GET /storage//metadata —> getMetadata + * PATCH /storage//metadata —> mergeMetadata * * The storageId is derived by taking a hash of the user's access token and combining it with the * conversationId, to ensure no users can access each other's conversations. @@ -56,6 +59,53 @@ export class StorageServiceClient { } } + /** + * Retrieve the metadata stored for this conversation. Throws if the conversation is unknown. + */ + async getMetadata( + conversationId: string, + ): Promise { + const response = await this.stubFor(conversationId).fetch( + this.url(conversationId, "metadata"), + { method: "GET", headers: this.headers() }, + ); + + if (!response.ok) { + const text = await response.text(); + throw new Error( + `Failed to get conversation metadata (${response.status}): ${text}`, + ); + } + + return response.json() as Promise; + } + + /** + * Shallow-merge a partial metadata patch into the existing metadata. Returns the merged metadata. + */ + async updateMetadata( + conversationId: string, + patch: Partial, + ): Promise { + const response = await this.stubFor(conversationId).fetch( + this.url(conversationId, "metadata"), + { + method: "PATCH", + headers: this.headers(), + body: JSON.stringify(patch), + }, + ); + + if (!response.ok) { + const text = await response.text(); + throw new Error( + `Failed to update conversation metadata (${response.status}): ${text}`, + ); + } + + return response.json() as Promise; + } + /** * Append new messages to a conversation and restart its TTL. * Optionally mark the conversation as done. @@ -65,7 +115,33 @@ export class StorageServiceClient { messages: Message[], isDone = false, ): Promise { - const body: StreamingMessagesState = { messages, isDone }; + return this.appendEvents(conversationId, messages, isDone); + } + + /** + * Retrieve all messages that have been added since the last call to this method + * (tracked via a per-conversation bookmark) and advance the bookmark. + * Also returns whether the conversation has been marked done. + */ + async getNewMessages( + conversationId: string, + ): Promise { + return this.getNewEvents(conversationId); + } + + /** + * Type-generic variant of `appendMessages`. The DO stores entries opaquely under indexed keys + * — the typed wrapper exists so SpotterViz can stream `SpotterVizEvent` objects through the + * same bookmark / TTL machinery used for spotter `Message` objects without leaking the + * SpotterViz type into the spotter API. + */ + async appendEvents( + conversationId: string, + events: T[], + isDone = false, + ): Promise { + // Wire field stays "messages" so the DO route is shared between the two callers. + const body = { messages: events, isDone }; const response = await this.stubFor(conversationId).fetch( this.url(conversationId, "append"), @@ -74,20 +150,16 @@ export class StorageServiceClient { if (!response.ok) { const text = await response.text(); - throw new Error( - `Failed to append messages (${response.status}): ${text}`, - ); + throw new Error(`Failed to append events (${response.status}): ${text}`); } } /** - * Retrieve all messages that have been added since the last call to this method - * (tracked via a per-conversation bookmark) and advance the bookmark. - * Also returns whether the conversation has been marked done. + * Type-generic variant of `getNewMessages`. See `appendEvents` for the rationale. */ - async getNewMessages( + async getNewEvents( conversationId: string, - ): Promise { + ): Promise<{ messages: T[]; isDone: boolean }> { const response = await this.stubFor(conversationId).fetch( this.url(conversationId, "messages"), { method: "GET", headers: this.headers() }, @@ -95,9 +167,9 @@ export class StorageServiceClient { if (!response.ok) { const text = await response.text(); - throw new Error(`Failed to get messages (${response.status}): ${text}`); + throw new Error(`Failed to get events (${response.status}): ${text}`); } - return response.json() as Promise; + return response.json() as Promise<{ messages: T[]; isDone: boolean }>; } } diff --git a/src/thoughtspot/thoughtspot-client.ts b/src/thoughtspot/thoughtspot-client.ts index cf88054..31d7217 100644 --- a/src/thoughtspot/thoughtspot-client.ts +++ b/src/thoughtspot/thoughtspot-client.ts @@ -1,16 +1,16 @@ import { - createBearerAuthenticationConfig, ThoughtSpotRestApi, + createBearerAuthenticationConfig, } from "@thoughtspot/rest-api-sdk"; import type { AgentConversation, RequestContext, ResponseContext, } from "@thoughtspot/rest-api-sdk"; -import YAML from "yaml"; -import { of } from "rxjs"; -import type { SessionInfo } from "./types"; import { customAlphabet } from "nanoid"; +import { of } from "rxjs"; +import YAML from "yaml"; +import type { BachSession, SessionInfo } from "./types"; /* * Inject custom handlers into the ThoughtSpot client @@ -42,6 +42,8 @@ export const getThoughtSpotClient = ( addGetAnswerSession(client, instanceUrl, bearerToken); addCreateAgentConversationWithAutoMode(client, instanceUrl, bearerToken); addSendAgentConversationMessageStreaming(client, instanceUrl, bearerToken); + addCreateBachPinboardSession(client, instanceUrl, bearerToken); + addSaveBachPinboard(client, instanceUrl, bearerToken); return client; }; @@ -261,6 +263,154 @@ function addCreateAgentConversationWithAutoMode( }; } +/* + * Initiate a new BACH pinboard session for a liveboard. + */ +function addCreateBachPinboardSession( + client: any, + instanceUrl: string, + token: string, +) { + (client as any).createBachPinboardSession = async ({ + liveboardId, + }: { liveboardId: string }): Promise => { + const endpoint = "/callosum/v1/bach/pinboard/"; + // Java callers (e.g. BachPinboardRequestBuilder) seed the session with a fresh GUID rather + // than leaving the field empty; mirror that to avoid validation rejections. + const clientTransactionId = crypto.randomUUID(); + const fetchOptions = { + method: "POST", + headers: { + "Content-Type": "application/json", + Accept: "application/json", + "user-agent": "ThoughtSpot-ts-client", + Authorization: `Bearer ${token}`, + }, + body: JSON.stringify({ + pinboardSession: { transactionId: clientTransactionId }, + pinboardRequests: [ + { + type: "LOAD_PINBOARD", + loadPinboard: { savedPinboardId: liveboardId }, + }, + ], + }), + }; + const response = await fetch(`${instanceUrl}${endpoint}`, fetchOptions); + if (!response.ok) { + const errorText = await response.text(); + throw new Error( + `createBachPinboardSession failed with status ${response.status}: ${errorText}`, + ); + } + + const data = (await response.json()) as { + status?: { + statusCode?: string; + errorMessage?: string; + errorCode?: string; + }; + pinboardSession?: { + transactionId?: string; + generationNumber?: string | number; + }; + pinboardResponses?: Array<{ + loadPinboard?: { + status?: { statusCode?: string; errorMessage?: string }; + }; + status?: { statusCode?: string; errorMessage?: string }; + }>; + }; + + const statusCode = data?.status?.statusCode; + if (statusCode && statusCode !== "OK") { + const detail = + data.status?.errorMessage || + data.pinboardResponses?.[0]?.loadPinboard?.status?.errorMessage || + data.pinboardResponses?.[0]?.status?.errorMessage || + JSON.stringify(data.status); + throw new Error( + `createBachPinboardSession returned non-OK status ${statusCode}: ${detail}`, + ); + } + + const transactionId = data?.pinboardSession?.transactionId; + const generationNumber = data?.pinboardSession?.generationNumber; + if (!transactionId || generationNumber === undefined) { + throw new Error( + `createBachPinboardSession: missing session ids in response: ${JSON.stringify(data)}`, + ); + } + return { transactionId, generationNumber: String(generationNumber) }; + }; +} + +/* + * Save the current state of the pinboard. + */ +function addSaveBachPinboard(client: any, instanceUrl: string, token: string) { + (client as any).saveBachPinboard = async ({ + transactionId, + generationNumber, + }: { + transactionId: string; + generationNumber: string; + }): Promise => { + const endpoint = "/callosum/v1/bach/pinboard/"; + const fetchOptions = { + method: "POST", + headers: { + "Content-Type": "application/json", + Accept: "application/json", + "user-agent": "ThoughtSpot-ts-client", + Authorization: `Bearer ${token}`, + }, + body: JSON.stringify({ + pinboardSession: { transactionId, generationNumber }, + pinboardRequests: [ + { + type: "SAVE_PINBOARD", + savePinboard: {}, + }, + ], + }), + }; + const response = await fetch(`${instanceUrl}${endpoint}`, fetchOptions); + if (!response.ok) { + const errorText = await response.text(); + throw new Error( + `saveBachPinboard failed with status ${response.status}: ${errorText}`, + ); + } + + const data = (await response.json()) as { + status?: { + statusCode?: string; + errorMessage?: string; + errorCode?: string; + }; + pinboardResponses?: Array<{ + savePinboard?: { + status?: { statusCode?: string; errorMessage?: string }; + }; + status?: { statusCode?: string; errorMessage?: string }; + }>; + }; + + const statusCode = data?.status?.statusCode; + if (statusCode && statusCode !== "OK") { + const detail = + data.status?.errorMessage || + data.pinboardResponses?.[0]?.savePinboard?.status?.errorMessage || + data.pinboardResponses?.[0]?.status?.errorMessage || + JSON.stringify(data.status); + throw new Error( + `saveBachPinboard returned non-OK status ${statusCode}: ${detail}`, + ); + } + }; +} + /* * Generator initialized once at module level so the internal buffers and state * are pre-computed once and reused across calls — important in streaming scenarios diff --git a/src/thoughtspot/thoughtspot-service.ts b/src/thoughtspot/thoughtspot-service.ts index 974cb17..eaf375c 100644 --- a/src/thoughtspot/thoughtspot-service.ts +++ b/src/thoughtspot/thoughtspot-service.ts @@ -15,8 +15,7 @@ import { createRequestMetricsRecorder } from "../metrics/runtime/request-metrics import type { MetricsEnvLike } from "../metrics/runtime/runtime-config"; import { UPSTREAM_OPERATION_NAMES, - type UpstreamOperation, - recordUpstreamCallMetrics, + observeUpstreamCall, recordUpstreamStreamStartedMetric, } from "../metrics/runtime/tool-metrics"; import { WithSpan, getActiveSpan } from "../metrics/tracing/tracing-utils"; @@ -46,28 +45,6 @@ export class ThoughtSpotService { private readonly metrics: ThoughtSpotServiceMetricsOptions = {}, ) {} - private async observeUpstreamCall( - operation: UpstreamOperation, - call: () => Promise, - ): Promise { - const startedAt = Date.now(); - let outcome: "success" | "upstream_error" = "success"; - - try { - return await call(); - } catch (error) { - outcome = "upstream_error"; - throw error; - } finally { - recordUpstreamCallMetrics( - this.metrics.recorder, - operation, - outcome, - Date.now() - startedAt, - ); - } - } - private createStreamMetricsRecorder(): MetricsRecorder { const recorder = createRequestMetricsRecorder(this.metrics.metricsEnv); recorder.setAnalyticsContext(this.metrics.analyticsContext); @@ -109,7 +86,8 @@ export class ThoughtSpotService { span?.setAttribute("query", query); span?.addEvent("query-get-data-source-suggestions"); - const response = await this.observeUpstreamCall( + const response = await observeUpstreamCall( + this.metrics.recorder, UPSTREAM_OPERATION_NAMES.getDataSourceSuggestions, () => this.client.getDataSourceSuggestions({ @@ -176,7 +154,8 @@ export class ThoughtSpotService { ); span?.addEvent("get-decomposed-query"); - const resp = await this.observeUpstreamCall( + const resp = await observeUpstreamCall( + this.metrics.recorder, UPSTREAM_OPERATION_NAMES.queryGetDecomposedQuery, () => this.client.queryGetDecomposedQuery({ @@ -243,7 +222,8 @@ export class ThoughtSpotService { generation_number, }); - const data = await this.observeUpstreamCall( + const data = await observeUpstreamCall( + this.metrics.recorder, UPSTREAM_OPERATION_NAMES.exportAnswerReport, () => this.client.exportAnswerReport({ @@ -286,7 +266,8 @@ export class ThoughtSpotService { try { span?.setAttribute("session_identifier", session_identifier); - const tml = await this.observeUpstreamCall( + const tml = await observeUpstreamCall( + this.metrics.recorder, UPSTREAM_OPERATION_NAMES.exportUnsavedAnswerTml, () => (this.client as any).exportUnsavedAnswerTML({ @@ -317,7 +298,8 @@ export class ThoughtSpotService { try { // Use auto mode by default, but support passing an explicit data source context - const response = await this.observeUpstreamCall( + const response = await observeUpstreamCall( + this.metrics.recorder, UPSTREAM_OPERATION_NAMES.createAgentConversation, () => (this.client as any).createAgentConversationWithAutoMode({ @@ -369,7 +351,8 @@ export class ThoughtSpotService { // Validate the response body inside the observed call so an unusable streaming // response is counted as `upstream_error` instead of a false success. - const reader = await this.observeUpstreamCall( + const reader = await observeUpstreamCall( + this.metrics.recorder, UPSTREAM_OPERATION_NAMES.sendAgentConversationMessageStreaming, async () => { const response = await ( @@ -460,7 +443,8 @@ export class ThoughtSpotService { ); try { - const answer = await this.observeUpstreamCall( + const answer = await observeUpstreamCall( + this.metrics.recorder, UPSTREAM_OPERATION_NAMES.singleAnswer, () => this.client.singleAnswer({ @@ -477,7 +461,8 @@ export class ThoughtSpotService { const [data, session, tml] = await Promise.all([ this.getAnswerData(question, session_identifier, generation_number), - this.observeUpstreamCall( + observeUpstreamCall( + this.metrics.recorder, UPSTREAM_OPERATION_NAMES.getAnswerSession, () => (this.client as any).getAnswerSession({ @@ -594,6 +579,69 @@ export class ThoughtSpotService { } } + /** + * Create an empty liveboard with just a name. Returns the new liveboard's GUID. + */ + @WithSpan("create-empty-liveboard") + async createEmptyLiveboard(name: string): Promise<{ liveboardId: string }> { + const tml = { + liveboard: { name, visualizations: [], layout: { tiles: [] } }, + }; + + const resp = await observeUpstreamCall( + this.metrics.recorder, + UPSTREAM_OPERATION_NAMES.importMetadataTml, + () => + this.client.importMetadataTML({ + metadata_tmls: [JSON.stringify(tml)], + import_policy: "ALL_OR_NONE", + create_new: true, + }), + ); + + const liveboardId = (resp as any)?.[0]?.response?.header?.id_guid; + if (!liveboardId) { + throw new Error( + `createEmptyLiveboard: id_guid missing in response: ${JSON.stringify(resp)}`, + ); + } + return { liveboardId }; + } + + /** + * Initiate a new BACH pinboard session for a liveboard. + */ + @WithSpan("create-bach-pinboard-session") + async createBachPinboardSession( + liveboardId: string, + ): Promise<{ transactionId: string; generationNumber: string }> { + return observeUpstreamCall( + this.metrics.recorder, + UPSTREAM_OPERATION_NAMES.createBachPinboardSession, + () => (this.client as any).createBachPinboardSession({ liveboardId }), + ); + } + + /** + * Commit the current generation of a BACH pinboard session back to the saved liveboard. The + * BACH session is unaffected by the save and can continue to receive further mutations. + */ + @WithSpan("save-bach-pinboard") + async saveBachPinboard( + transactionId: string, + generationNumber: string, + ): Promise { + await observeUpstreamCall( + this.metrics.recorder, + UPSTREAM_OPERATION_NAMES.saveBachPinboard, + () => + (this.client as any).saveBachPinboard({ + transactionId, + generationNumber, + }), + ); + } + /** * Create liveboard from answers */ @@ -625,7 +673,8 @@ export class ThoughtSpotService { }, }; - const resp = await this.observeUpstreamCall( + const resp = await observeUpstreamCall( + this.metrics.recorder, UPSTREAM_OPERATION_NAMES.importMetadataTml, () => this.client.importMetadataTML({ @@ -651,7 +700,8 @@ export class ThoughtSpotService { span?.addEvent("get-data-sources"); - const resp = await this.observeUpstreamCall( + const resp = await observeUpstreamCall( + this.metrics.recorder, UPSTREAM_OPERATION_NAMES.searchMetadata, () => this.client.searchMetadata({ @@ -688,7 +738,8 @@ export class ThoughtSpotService { async getSessionInfo(): Promise { const span = getActiveSpan(); - const info = await this.observeUpstreamCall( + const info = await observeUpstreamCall( + this.metrics.recorder, UPSTREAM_OPERATION_NAMES.getSessionInfo, () => (this.client as any).getSessionInfo(), ); @@ -724,7 +775,8 @@ export class ThoughtSpotService { async searchWorksheets(searchTerm: string): Promise { const span = getActiveSpan(); - const resp = await this.observeUpstreamCall( + const resp = await observeUpstreamCall( + this.metrics.recorder, UPSTREAM_OPERATION_NAMES.searchMetadata, () => this.client.searchMetadata({ @@ -763,7 +815,8 @@ export class ThoughtSpotService { @WithSpan("validate-connection") async validateConnection(): Promise { try { - await this.observeUpstreamCall( + await observeUpstreamCall( + this.metrics.recorder, UPSTREAM_OPERATION_NAMES.getSessionInfo, () => (this.client as any).getSessionInfo(), ); diff --git a/src/thoughtspot/types.ts b/src/thoughtspot/types.ts index b4d6a60..d3e7890 100644 --- a/src/thoughtspot/types.ts +++ b/src/thoughtspot/types.ts @@ -59,3 +59,12 @@ export interface Answer { session_identifier: string; generation_number: number; } + +/** + * Identifier for a BACH pinboard session (returned by the BACH service when a pinboard is + * loaded or mutated; required on subsequent calls that operate on the same session). + */ +export interface BachSession { + transactionId: string; + generationNumber: string; +} From ee56324e56196e40c23e626b20f1746e9a009b64 Mon Sep 17 00:00:00 2001 From: Rohit V Date: Thu, 4 Jun 2026 11:21:56 +0530 Subject: [PATCH 2/5] Added UTs --- test/servers/mcp-server.spec.ts | 16 +- test/spotterviz/spotterviz-client.spec.ts | 253 ++++++++ test/spotterviz/spotterviz-service.spec.ts | 552 ++++++++++++++++++ test/spotterviz/spotterviz-sse-stream.spec.ts | 313 ++++++++++ test/storage-service/storage-service.spec.ts | 206 ++++++- test/thoughtspot/thoughtspot-client.spec.ts | 255 +++++++- test/thoughtspot/thoughtspot-service.spec.ts | 104 ++++ 7 files changed, 1689 insertions(+), 10 deletions(-) create mode 100644 test/spotterviz/spotterviz-client.spec.ts create mode 100644 test/spotterviz/spotterviz-service.spec.ts create mode 100644 test/spotterviz/spotterviz-sse-stream.spec.ts diff --git a/test/servers/mcp-server.spec.ts b/test/servers/mcp-server.spec.ts index 1eb50d7..b3b05fb 100644 --- a/test/servers/mcp-server.spec.ts +++ b/test/servers/mcp-server.spec.ts @@ -164,14 +164,18 @@ describe("MCP Server", () => { const result = await listTools(); - // V2 tools (latest version): 5 tools - expect(result.tools).toHaveLength(5); + // V2 tools (latest version): 5 Spotter + 4 SpotterViz tools + expect(result.tools).toHaveLength(9); expect(result.tools?.map((t) => t.name)).toEqual([ "check_connectivity", "create_analysis_session", "send_session_message", "get_session_updates", "create_dashboard", + "spotterviz_create_session", + "spotterviz_submit_query", + "spotterviz_get_updates", + "spotterviz_save_liveboard", ]); }); @@ -201,7 +205,7 @@ describe("MCP Server", () => { ); }); - it("should return 5 tools regardless of enableSpotterDataSourceDiscovery when using latest (V2)", async () => { + it("should return 9 tools regardless of enableSpotterDataSourceDiscovery when using latest (V2)", async () => { // Mock getThoughtSpotClient with enableSpotterDataSourceDiscovery set to false vi.spyOn(thoughtspotClient, "getThoughtSpotClient").mockReturnValue({ getSessionInfo: vi.fn().mockResolvedValue({ @@ -235,13 +239,17 @@ describe("MCP Server", () => { const result = await listTools(); // V2 tools don't have a datasource discovery tool, so filtering has no effect - expect(result.tools).toHaveLength(5); + expect(result.tools).toHaveLength(9); expect(result.tools?.map((t) => t.name)).toEqual([ "check_connectivity", "create_analysis_session", "send_session_message", "get_session_updates", "create_dashboard", + "spotterviz_create_session", + "spotterviz_submit_query", + "spotterviz_get_updates", + "spotterviz_save_liveboard", ]); }); }); diff --git a/test/spotterviz/spotterviz-client.spec.ts b/test/spotterviz/spotterviz-client.spec.ts new file mode 100644 index 0000000..6e70561 --- /dev/null +++ b/test/spotterviz/spotterviz-client.spec.ts @@ -0,0 +1,253 @@ +import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; +import { SpotterVizClient } from "../../src/spotterviz/spotterviz-client"; +import type { + AuroraSessionContext, + AuroraSessionInitResult, +} from "../../src/spotterviz/types"; +import type { BachSession } from "../../src/thoughtspot/types"; + +const INSTANCE_URL = "https://test.thoughtspot.com"; +const BEARER = "test-bearer-token"; + +const bachSession: BachSession = { + transactionId: "txn-1", + generationNumber: "7", +}; + +function makeJsonResponse( + body: unknown, + init: { status?: number; ok?: boolean } = {}, +): Response { + return { + ok: init.ok ?? (init.status ?? 200) < 400, + status: init.status ?? 200, + json: vi.fn().mockResolvedValue(body), + text: vi.fn().mockResolvedValue(typeof body === "string" ? body : ""), + } as unknown as Response; +} + +function makeErrorResponse(status: number, text: string): Response { + return { + ok: false, + status, + text: vi.fn().mockResolvedValue(text), + json: vi.fn().mockResolvedValue({}), + } as unknown as Response; +} + +describe("SpotterVizClient", () => { + beforeEach(() => { + global.fetch = vi.fn(); + }); + + afterEach(() => { + vi.restoreAllMocks(); + }); + + describe("createAuroraSession", () => { + it("POSTs to /aurora/init with the BACH session ids and parses the success response", async () => { + const apiResponse = { + success: true, + session_id: "aurora-sess-1", + jwt_token: "jwt-abc", + liveboard_name: "Saved LB", + }; + (fetch as any).mockResolvedValue(makeJsonResponse(apiResponse)); + + const client = new SpotterVizClient(INSTANCE_URL, BEARER); + const result: AuroraSessionInitResult = + await client.createAuroraSession(bachSession); + + expect(fetch).toHaveBeenCalledTimes(1); + const [url, options] = (fetch as any).mock.calls[0]; + expect(url).toBe(`${INSTANCE_URL}/aurora/init`); + expect(options.method).toBe("POST"); + + const body = JSON.parse(options.body); + expect(body.session_id).toEqual({ + transaction_id: bachSession.transactionId, + generation_number: bachSession.generationNumber, + }); + expect(body.locale).toBe("en-US"); + expect(body.starter_prompts).toEqual([]); + expect(body.tenant_flags).toEqual({ enable_charting_skill: true }); + + expect(result).toEqual({ + auroraSessionId: "aurora-sess-1", + jwtToken: "jwt-abc", + liveboardName: "Saved LB", + }); + }); + + it("uses the configured locale", async () => { + (fetch as any).mockResolvedValue( + makeJsonResponse({ + success: true, + session_id: "s", + jwt_token: "j", + }), + ); + + const client = new SpotterVizClient(INSTANCE_URL, BEARER, "fr-FR"); + await client.createAuroraSession(bachSession); + + const body = JSON.parse((fetch as any).mock.calls[0][1].body); + expect(body.locale).toBe("fr-FR"); + }); + + it("sends Authorization, user-agent override, and JSON Accept", async () => { + (fetch as any).mockResolvedValue( + makeJsonResponse({ + success: true, + session_id: "s", + jwt_token: "j", + }), + ); + + const client = new SpotterVizClient(INSTANCE_URL, BEARER); + await client.createAuroraSession(bachSession); + + const headers = (fetch as any).mock.calls[0][1].headers as Record< + string, + string + >; + expect(headers.Authorization).toBe(`Bearer ${BEARER}`); + expect(headers["user-agent"]).toBe("ThoughtSpot-ts-client"); + expect(headers.Accept).toBe("application/json"); + expect(headers["Content-Type"]).toBe("application/json"); + // /init must not carry the session token header — only later calls do. + expect(headers["X-Aurora-Session-Token"]).toBeUndefined(); + }); + + it("throws with status + body on non-ok HTTP response", async () => { + (fetch as any).mockResolvedValue(makeErrorResponse(502, "upstream down")); + + const client = new SpotterVizClient(INSTANCE_URL, BEARER); + await expect(client.createAuroraSession(bachSession)).rejects.toThrow( + "createAuroraSession failed (502): upstream down", + ); + }); + + it("throws if the payload is missing session_id or jwt_token", async () => { + (fetch as any).mockResolvedValue( + makeJsonResponse({ success: true, jwt_token: "j" }), + ); + + const client = new SpotterVizClient(INSTANCE_URL, BEARER); + await expect(client.createAuroraSession(bachSession)).rejects.toThrow( + /createAuroraSession failed/, + ); + }); + + it("throws and surfaces error list when success=false", async () => { + (fetch as any).mockResolvedValue( + makeJsonResponse({ + success: false, + errors: ["boom-1", "boom-2"], + }), + ); + + const client = new SpotterVizClient(INSTANCE_URL, BEARER); + await expect(client.createAuroraSession(bachSession)).rejects.toThrow( + "createAuroraSession failed: boom-1; boom-2", + ); + }); + + it("falls back to message when errors array is absent", async () => { + (fetch as any).mockResolvedValue( + makeJsonResponse({ success: false, message: "no good" }), + ); + + const client = new SpotterVizClient(INSTANCE_URL, BEARER); + await expect(client.createAuroraSession(bachSession)).rejects.toThrow( + "createAuroraSession failed: no good", + ); + }); + + it("returns undefined liveboardName when the response omits it", async () => { + (fetch as any).mockResolvedValue( + makeJsonResponse({ + success: true, + session_id: "s", + jwt_token: "j", + }), + ); + + const client = new SpotterVizClient(INSTANCE_URL, BEARER); + const result = await client.createAuroraSession(bachSession); + expect(result.liveboardName).toBeUndefined(); + }); + }); + + describe("submitAuroraQuery", () => { + const auroraCtx: AuroraSessionContext = { + auroraSessionId: "aurora-sess-1", + auroraJwtToken: "jwt-xyz", + transactionId: "txn-9", + generationNumber: "3", + liveboardId: "lb-1", + }; + + it("POSTs to /aurora/chat/stream with the message and session ids", async () => { + const sseResponse = makeJsonResponse({}, { ok: true, status: 200 }); + (fetch as any).mockResolvedValue(sseResponse); + + const client = new SpotterVizClient(INSTANCE_URL, BEARER); + const result = await client.submitAuroraQuery( + auroraCtx, + "How is revenue?", + ); + + expect(fetch).toHaveBeenCalledTimes(1); + const [url, options] = (fetch as any).mock.calls[0]; + expect(url).toBe(`${INSTANCE_URL}/aurora/chat/stream`); + expect(options.method).toBe("POST"); + + const body = JSON.parse(options.body); + expect(body.message).toBe("How is revenue?"); + expect(body.session_id).toEqual({ + transaction_id: "txn-9", + generation_number: "3", + }); + expect(body.liveboard_id).toBe("lb-1"); + expect(body.stream).toBe(true); + expect(body.locale).toBe("en-US"); + + expect(result).toBe(sseResponse); + }); + + it("sends the SSE Accept header and echoes the Aurora session JWT", async () => { + (fetch as any).mockResolvedValue(makeJsonResponse({})); + + const client = new SpotterVizClient(INSTANCE_URL, BEARER); + await client.submitAuroraQuery(auroraCtx, "msg"); + + const headers = (fetch as any).mock.calls[0][1].headers as Record< + string, + string + >; + expect(headers.Accept).toBe("text/event-stream"); + expect(headers["X-Aurora-Session-Token"]).toBe("jwt-xyz"); + expect(headers.Authorization).toBe(`Bearer ${BEARER}`); + expect(headers["user-agent"]).toBe("ThoughtSpot-ts-client"); + }); + + it("throws with status + body on non-ok HTTP response", async () => { + (fetch as any).mockResolvedValue(makeErrorResponse(401, "no auth")); + + const client = new SpotterVizClient(INSTANCE_URL, BEARER); + await expect(client.submitAuroraQuery(auroraCtx, "msg")).rejects.toThrow( + "submitAuroraQuery failed (401): no auth", + ); + }); + + it("propagates network errors from fetch", async () => { + (fetch as any).mockRejectedValue(new Error("offline")); + + const client = new SpotterVizClient(INSTANCE_URL, BEARER); + await expect(client.submitAuroraQuery(auroraCtx, "msg")).rejects.toThrow( + "offline", + ); + }); + }); +}); diff --git a/test/spotterviz/spotterviz-service.spec.ts b/test/spotterviz/spotterviz-service.spec.ts new file mode 100644 index 0000000..02b8f33 --- /dev/null +++ b/test/spotterviz/spotterviz-service.spec.ts @@ -0,0 +1,552 @@ +import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; +import type { SpotterVizClient } from "../../src/spotterviz/spotterviz-client"; +import { SpotterVizService } from "../../src/spotterviz/spotterviz-service"; +import * as sseStreamModule from "../../src/spotterviz/spotterviz-sse-stream"; +import type { + AuroraSessionContext, + SpotterVizEvent, +} from "../../src/spotterviz/types"; +import type { StorageServiceClient } from "../../src/storage-service/storage-service"; +import type { ThoughtSpotService } from "../../src/thoughtspot/thoughtspot-service"; + +// --------------------------------------------------------------------------- +// Test fixtures +// --------------------------------------------------------------------------- + +const SPOTTER_VIZ_SESSION_ID = "aurora-sess-1"; +const BACH_SESSION = { transactionId: "txn-1", generationNumber: "1" }; +const AURORA_INIT = { + auroraSessionId: SPOTTER_VIZ_SESSION_ID, + jwtToken: "jwt-1", + liveboardName: "Saved LB", +}; + +type TsServiceMock = ThoughtSpotService & { + createEmptyLiveboard: ReturnType; + createBachPinboardSession: ReturnType; + saveBachPinboard: ReturnType; +}; + +type StorageMock = StorageServiceClient & { + getMetadata: ReturnType; + updateMetadata: ReturnType; + appendEvents: ReturnType; + getNewEvents: ReturnType; +}; + +type ClientMock = SpotterVizClient & { + instanceUrl: string; + createAuroraSession: ReturnType; + submitAuroraQuery: ReturnType; +}; + +function makeTsService(): TsServiceMock { + return { + createEmptyLiveboard: vi.fn().mockResolvedValue({ liveboardId: "lb-new" }), + createBachPinboardSession: vi.fn().mockResolvedValue(BACH_SESSION), + saveBachPinboard: vi.fn().mockResolvedValue(undefined), + } as unknown as TsServiceMock; +} + +function makeStorage(): StorageMock { + return { + getMetadata: vi.fn(), + updateMetadata: vi.fn().mockResolvedValue({}), + appendEvents: vi.fn().mockResolvedValue(undefined), + getNewEvents: vi.fn(), + } as unknown as StorageMock; +} + +function makeClient(): ClientMock { + return { + instanceUrl: "https://ts.example.com", + createAuroraSession: vi.fn().mockResolvedValue(AURORA_INIT), + submitAuroraQuery: vi.fn(), + } as unknown as ClientMock; +} + +// --------------------------------------------------------------------------- +// createSession +// --------------------------------------------------------------------------- + +describe("SpotterVizService.createSession", () => { + let ts: TsServiceMock; + let storage: StorageMock; + let client: ClientMock; + let svc: SpotterVizService; + + beforeEach(() => { + ts = makeTsService(); + storage = makeStorage(); + client = makeClient(); + svc = new SpotterVizService(ts, storage, client); + }); + + it("creates a new liveboard when newLiveboardName is provided", async () => { + const result = await svc.createSession({ newLiveboardName: "My LB" }); + + expect(ts.createEmptyLiveboard).toHaveBeenCalledWith("My LB"); + expect(ts.createBachPinboardSession).toHaveBeenCalledWith("lb-new"); + expect(client.createAuroraSession).toHaveBeenCalledWith(BACH_SESSION); + expect(result.spotterVizSessionId).toBe(SPOTTER_VIZ_SESSION_ID); + expect(result.liveboardId).toBe("lb-new"); + }); + + it("reuses existingLiveboardId without creating a new liveboard", async () => { + const result = await svc.createSession({ + existingLiveboardId: "lb-existing", + }); + + expect(ts.createEmptyLiveboard).not.toHaveBeenCalled(); + expect(ts.createBachPinboardSession).toHaveBeenCalledWith("lb-existing"); + expect(result.liveboardId).toBe("lb-existing"); + }); + + it("throws when neither newLiveboardName nor existingLiveboardId is provided", async () => { + await expect(svc.createSession({})).rejects.toThrow( + "Could not resolve a liveboard id", + ); + }); + + it("persists the Aurora session context under the aurora session id", async () => { + await svc.createSession({ newLiveboardName: "My LB" }); + + expect(storage.updateMetadata).toHaveBeenCalledWith( + SPOTTER_VIZ_SESSION_ID, + expect.objectContaining({ + auroraSessionId: SPOTTER_VIZ_SESSION_ID, + auroraJwtToken: "jwt-1", + transactionId: BACH_SESSION.transactionId, + generationNumber: BACH_SESSION.generationNumber, + liveboardId: "lb-new", + liveboardName: "Saved LB", + }), + ); + }); + + it("falls back to newLiveboardName when aurora response omits liveboardName", async () => { + client.createAuroraSession.mockResolvedValueOnce({ + auroraSessionId: SPOTTER_VIZ_SESSION_ID, + jwtToken: "jwt-1", + }); + + const result = await svc.createSession({ newLiveboardName: "Caller LB" }); + + expect(result.liveboardName).toBe("Caller LB"); + const persistedCtx = storage.updateMetadata.mock + .calls[0][1] as AuroraSessionContext; + expect(persistedCtx.liveboardName).toBe("Caller LB"); + }); + + it("propagates errors from createBachPinboardSession", async () => { + ts.createBachPinboardSession.mockRejectedValueOnce(new Error("bach down")); + + await expect(svc.createSession({ newLiveboardName: "x" })).rejects.toThrow( + "bach down", + ); + expect(client.createAuroraSession).not.toHaveBeenCalled(); + }); + + it("propagates errors from the Aurora client", async () => { + client.createAuroraSession.mockRejectedValueOnce(new Error("aurora 500")); + + await expect(svc.createSession({ newLiveboardName: "x" })).rejects.toThrow( + "aurora 500", + ); + expect(storage.updateMetadata).not.toHaveBeenCalled(); + }); +}); + +// --------------------------------------------------------------------------- +// submitQuery +// --------------------------------------------------------------------------- + +describe("SpotterVizService.submitQuery", () => { + let ts: TsServiceMock; + let storage: StorageMock; + let client: ClientMock; + let svc: SpotterVizService; + + const baseCtx: AuroraSessionContext = { + auroraSessionId: SPOTTER_VIZ_SESSION_ID, + auroraJwtToken: "jwt-1", + transactionId: "txn-1", + generationNumber: "1", + liveboardId: "lb-1", + pollCount: 3, + }; + + beforeEach(() => { + ts = makeTsService(); + storage = makeStorage(); + client = makeClient(); + svc = new SpotterVizService(ts, storage, client); + storage.getMetadata.mockResolvedValue(baseCtx); + // Make the SSE drain a no-op to keep these tests deterministic. + vi.spyOn(sseStreamModule, "processAuroraSseStream").mockResolvedValue( + undefined, + ); + }); + + afterEach(() => { + vi.restoreAllMocks(); + }); + + function makeMockResponseWithReader(): Response { + return { + body: { + getReader: () => + ({ + read: vi.fn().mockResolvedValue({ done: true, value: undefined }), + }) as unknown as ReadableStreamDefaultReader, + }, + } as unknown as Response; + } + + it("loads the stored context, resets pollCount, and submits the Aurora query", async () => { + client.submitAuroraQuery.mockResolvedValue(makeMockResponseWithReader()); + + const { streamPromise } = await svc.submitQuery({ + spotterVizSessionId: SPOTTER_VIZ_SESSION_ID, + message: "Show me revenue", + }); + await streamPromise; + + expect(storage.getMetadata).toHaveBeenCalledWith(SPOTTER_VIZ_SESSION_ID); + expect(storage.updateMetadata).toHaveBeenCalledWith( + SPOTTER_VIZ_SESSION_ID, + { pollCount: 0 }, + ); + expect(client.submitAuroraQuery).toHaveBeenCalledWith( + baseCtx, + "Show me revenue", + ); + }); + + it("returns a streamPromise that wraps processAuroraSseStream", async () => { + client.submitAuroraQuery.mockResolvedValue(makeMockResponseWithReader()); + + const { streamPromise } = await svc.submitQuery({ + spotterVizSessionId: SPOTTER_VIZ_SESSION_ID, + message: "x", + }); + await streamPromise; + + expect(sseStreamModule.processAuroraSseStream).toHaveBeenCalledTimes(1); + expect( + (sseStreamModule.processAuroraSseStream as any).mock.calls[0][0], + ).toBe(SPOTTER_VIZ_SESSION_ID); + }); + + it("throws and best-effort marks the session done when the response has no body", async () => { + client.submitAuroraQuery.mockResolvedValue({ body: null } as Response); + + await expect( + svc.submitQuery({ + spotterVizSessionId: SPOTTER_VIZ_SESSION_ID, + message: "x", + }), + ).rejects.toThrow("Aurora /chat/stream returned no response body"); + + // Best-effort mark-done: appendEvents called with empty array + true. + expect(storage.appendEvents).toHaveBeenCalledWith( + SPOTTER_VIZ_SESSION_ID, + [], + true, + ); + }); + + it("throws and best-effort marks the session done when submitAuroraQuery fails", async () => { + client.submitAuroraQuery.mockRejectedValueOnce(new Error("aurora 500")); + + await expect( + svc.submitQuery({ + spotterVizSessionId: SPOTTER_VIZ_SESSION_ID, + message: "x", + }), + ).rejects.toThrow("aurora 500"); + + expect(storage.appendEvents).toHaveBeenCalledWith( + SPOTTER_VIZ_SESSION_ID, + [], + true, + ); + }); + + it("swallows secondary errors from the best-effort mark-done", async () => { + client.submitAuroraQuery.mockRejectedValueOnce(new Error("aurora 500")); + storage.appendEvents.mockRejectedValueOnce(new Error("mark-done failed")); + + // The original error must still surface; mark-done failure must not mask it. + await expect( + svc.submitQuery({ + spotterVizSessionId: SPOTTER_VIZ_SESSION_ID, + message: "x", + }), + ).rejects.toThrow("aurora 500"); + }); +}); + +// --------------------------------------------------------------------------- +// saveLiveboard +// --------------------------------------------------------------------------- + +describe("SpotterVizService.saveLiveboard", () => { + let ts: TsServiceMock; + let storage: StorageMock; + let client: ClientMock; + let svc: SpotterVizService; + + beforeEach(() => { + ts = makeTsService(); + storage = makeStorage(); + client = makeClient(); + svc = new SpotterVizService(ts, storage, client); + }); + + it("commits the BACH session and returns a usable liveboard URL", async () => { + storage.getMetadata.mockResolvedValueOnce({ + auroraSessionId: SPOTTER_VIZ_SESSION_ID, + auroraJwtToken: "jwt-1", + transactionId: "txn-9", + generationNumber: "4", + liveboardId: "lb-7", + } as AuroraSessionContext); + + const result = await svc.saveLiveboard({ + spotterVizSessionId: SPOTTER_VIZ_SESSION_ID, + }); + + expect(ts.saveBachPinboard).toHaveBeenCalledWith("txn-9", "4"); + expect(result.liveboardId).toBe("lb-7"); + expect(result.liveboardUrl).toBe("https://ts.example.com/#/pinboard/lb-7"); + }); + + it("throws when the stored context has no liveboardId", async () => { + storage.getMetadata.mockResolvedValueOnce({ + auroraSessionId: SPOTTER_VIZ_SESSION_ID, + auroraJwtToken: "jwt-1", + transactionId: "txn-9", + generationNumber: "4", + } as AuroraSessionContext); + + await expect( + svc.saveLiveboard({ spotterVizSessionId: SPOTTER_VIZ_SESSION_ID }), + ).rejects.toThrow("missing a liveboardId"); + expect(ts.saveBachPinboard).not.toHaveBeenCalled(); + }); + + it("propagates errors from saveBachPinboard", async () => { + storage.getMetadata.mockResolvedValueOnce({ + auroraSessionId: SPOTTER_VIZ_SESSION_ID, + auroraJwtToken: "jwt-1", + transactionId: "t", + generationNumber: "g", + liveboardId: "lb-1", + } as AuroraSessionContext); + ts.saveBachPinboard.mockRejectedValueOnce(new Error("bach save failed")); + + await expect( + svc.saveLiveboard({ spotterVizSessionId: SPOTTER_VIZ_SESSION_ID }), + ).rejects.toThrow("bach save failed"); + }); +}); + +// --------------------------------------------------------------------------- +// getUpdates +// --------------------------------------------------------------------------- + +describe("SpotterVizService.getUpdates", () => { + let ts: TsServiceMock; + let storage: StorageMock; + let client: ClientMock; + let svc: SpotterVizService; + + beforeEach(() => { + vi.useFakeTimers(); + ts = makeTsService(); + storage = makeStorage(); + client = makeClient(); + svc = new SpotterVizService(ts, storage, client); + }); + + afterEach(() => { + vi.useRealTimers(); + }); + + const ev = (event_type: string): SpotterVizEvent => ({ + event_type, + data: {}, + }); + + it("fast-returns without sleeping when the initial peek shows the turn is done", async () => { + storage.getNewEvents.mockResolvedValueOnce({ + messages: [ev("text.delta")], + isDone: true, + }); + + const promise = svc.getUpdates({ + spotterVizSessionId: SPOTTER_VIZ_SESSION_ID, + }); + // No timer advance needed — initial peek is the only call. + const result = await promise; + + expect(result.isDone).toBe(true); + expect(result.updates).toHaveLength(1); + expect(storage.getNewEvents).toHaveBeenCalledTimes(1); + // pollCount reset on done. + expect(storage.updateMetadata).toHaveBeenCalledWith( + SPOTTER_VIZ_SESSION_ID, + { pollCount: 0 }, + ); + }); + + it("sleeps the first backoff step (2s) when no prior poll has been recorded", async () => { + storage.getNewEvents.mockResolvedValueOnce({ messages: [], isDone: false }); + storage.getNewEvents.mockResolvedValueOnce({ + messages: [ev("text.delta")], + isDone: false, + }); + storage.getMetadata.mockResolvedValueOnce({} as AuroraSessionContext); + + const promise = svc.getUpdates({ + spotterVizSessionId: SPOTTER_VIZ_SESSION_ID, + }); + + // Drive past the first backoff step. + await vi.advanceTimersByTimeAsync(2000); + const result = await promise; + + expect(result.isDone).toBe(false); + expect(result.updates).toHaveLength(1); + // pollCount advanced 0 -> 1. + expect(storage.updateMetadata).toHaveBeenCalledWith( + SPOTTER_VIZ_SESSION_ID, + { pollCount: 1 }, + ); + }); + + it("uses the backoff sequence step indexed by the stored pollCount", async () => { + storage.getNewEvents.mockResolvedValueOnce({ messages: [], isDone: false }); + storage.getNewEvents.mockResolvedValueOnce({ messages: [], isDone: false }); + // pollCount=2 -> wait sequence index 2 -> 8s. + storage.getMetadata.mockResolvedValueOnce({ + pollCount: 2, + } as AuroraSessionContext); + + const promise = svc.getUpdates({ + spotterVizSessionId: SPOTTER_VIZ_SESSION_ID, + }); + + // Advance less than 8s — promise should not yet resolve. + await vi.advanceTimersByTimeAsync(7999); + // Just past the boundary. + await vi.advanceTimersByTimeAsync(2); + + await promise; + + expect(storage.updateMetadata).toHaveBeenCalledWith( + SPOTTER_VIZ_SESSION_ID, + { pollCount: 3 }, + ); + }); + + it("clamps the wait time to the last backoff step (16s) once pollCount exceeds the sequence", async () => { + storage.getNewEvents.mockResolvedValueOnce({ messages: [], isDone: false }); + storage.getNewEvents.mockResolvedValueOnce({ messages: [], isDone: false }); + // pollCount=99 -> still uses the last index (16s). + storage.getMetadata.mockResolvedValueOnce({ + pollCount: 99, + } as AuroraSessionContext); + + const promise = svc.getUpdates({ + spotterVizSessionId: SPOTTER_VIZ_SESSION_ID, + }); + await vi.advanceTimersByTimeAsync(16_000); + await promise; + + // pollCount keeps incrementing — important for span attributes / observability. + expect(storage.updateMetadata).toHaveBeenCalledWith( + SPOTTER_VIZ_SESSION_ID, + { pollCount: 100 }, + ); + }); + + it("merges events from both peek and the followup poll", async () => { + storage.getNewEvents.mockResolvedValueOnce({ + messages: [ev("a")], + isDone: false, + }); + storage.getNewEvents.mockResolvedValueOnce({ + messages: [ev("b"), ev("c")], + isDone: false, + }); + storage.getMetadata.mockResolvedValueOnce({} as AuroraSessionContext); + + const promise = svc.getUpdates({ + spotterVizSessionId: SPOTTER_VIZ_SESSION_ID, + }); + await vi.advanceTimersByTimeAsync(2000); + const result = await promise; + + expect(result.updates.map((u) => u.event_type)).toEqual(["a", "b", "c"]); + }); + + it("resets pollCount when the followup poll reports the turn is done", async () => { + storage.getNewEvents.mockResolvedValueOnce({ messages: [], isDone: false }); + storage.getNewEvents.mockResolvedValueOnce({ + messages: [ev("done")], + isDone: true, + }); + storage.getMetadata.mockResolvedValueOnce({ + pollCount: 1, + } as AuroraSessionContext); + + const promise = svc.getUpdates({ + spotterVizSessionId: SPOTTER_VIZ_SESSION_ID, + }); + await vi.advanceTimersByTimeAsync(4000); + const result = await promise; + + expect(result.isDone).toBe(true); + const patches = storage.updateMetadata.mock.calls.map((c) => c[1]); + // One advance (1 -> 2), then a reset back to 0. + expect(patches).toEqual( + expect.arrayContaining([{ pollCount: 2 }, { pollCount: 0 }]), + ); + }); + + it("treats missing pollCount in metadata as 0", async () => { + storage.getNewEvents.mockResolvedValueOnce({ messages: [], isDone: false }); + storage.getNewEvents.mockResolvedValueOnce({ messages: [], isDone: false }); + storage.getMetadata.mockResolvedValueOnce({ + // pollCount absent + } as AuroraSessionContext); + + const promise = svc.getUpdates({ + spotterVizSessionId: SPOTTER_VIZ_SESSION_ID, + }); + await vi.advanceTimersByTimeAsync(2000); + await promise; + + expect(storage.updateMetadata).toHaveBeenCalledWith( + SPOTTER_VIZ_SESSION_ID, + { pollCount: 1 }, + ); + }); + + it("swallows errors from the pollCount reset so already-drained events are not lost", async () => { + storage.getNewEvents.mockResolvedValueOnce({ + messages: [ev("done")], + isDone: true, + }); + storage.updateMetadata.mockRejectedValueOnce(new Error("reset failed")); + + await expect( + svc.getUpdates({ spotterVizSessionId: SPOTTER_VIZ_SESSION_ID }), + ).resolves.toEqual({ + updates: [{ event_type: "done", data: {} }], + isDone: true, + }); + }); +}); diff --git a/test/spotterviz/spotterviz-sse-stream.spec.ts b/test/spotterviz/spotterviz-sse-stream.spec.ts new file mode 100644 index 0000000..b976e9e --- /dev/null +++ b/test/spotterviz/spotterviz-sse-stream.spec.ts @@ -0,0 +1,313 @@ +import { beforeEach, describe, expect, it, vi } from "vitest"; +import { processAuroraSseStream } from "../../src/spotterviz/spotterviz-sse-stream"; +import type { + AuroraSessionContext, + SpotterVizEvent, +} from "../../src/spotterviz/types"; +import type { StorageServiceClient } from "../../src/storage-service/storage-service"; + +// --------------------------------------------------------------------------- +// Helpers +// --------------------------------------------------------------------------- + +const CONVERSATION_ID = "conv-abc"; + +type StorageMock = StorageServiceClient & { + appendEvents: ReturnType; + updateMetadata: ReturnType; +}; + +function makeStorageMock(): StorageMock { + return { + appendEvents: vi.fn().mockResolvedValue(undefined), + updateMetadata: vi.fn().mockResolvedValue({}), + } as unknown as StorageMock; +} + +/** + * Build a reader that yields a fixed sequence of UTF-8 chunks, then signals done. + * Each chunk is delivered as a separate `read()` resolution so we can exercise + * the buffer-across-chunks logic in `processAuroraSseStream`. + */ +function makeReader(chunks: string[]): ReadableStreamDefaultReader { + const encoder = new TextEncoder(); + const queue = [...chunks]; + return { + read: vi.fn(async () => { + if (queue.length === 0) { + return { + done: true, + value: undefined, + } as ReadableStreamReadResult; + } + const next = queue.shift() as string; + return { + done: false, + value: encoder.encode(next), + } as ReadableStreamReadResult; + }), + releaseLock: vi.fn(), + cancel: vi.fn(), + closed: Promise.resolve(undefined), + } as unknown as ReadableStreamDefaultReader; +} + +/** Build a reader whose first `read()` throws. */ +function makeThrowingReader( + err: Error, +): ReadableStreamDefaultReader { + return { + read: vi.fn().mockRejectedValue(err), + releaseLock: vi.fn(), + cancel: vi.fn(), + closed: Promise.resolve(undefined), + } as unknown as ReadableStreamDefaultReader; +} + +const sseFrame = (eventType: string, data: unknown): string => + `event: ${eventType}\ndata: ${JSON.stringify(data)}\n\n`; + +// --------------------------------------------------------------------------- +// Tests +// --------------------------------------------------------------------------- + +describe("processAuroraSseStream", () => { + let storage: StorageMock; + + beforeEach(() => { + storage = makeStorageMock(); + }); + + it("parses a single SSE frame and appends it to storage; marks done when the stream closes", async () => { + const reader = makeReader([ + sseFrame("text.delta", { + event_type: "text.delta", + data: { text: "Hello" }, + message_id: "m-1", + idx: 0, + timestamp: "2024-01-01T00:00:00Z", + }), + ]); + + await processAuroraSseStream(CONVERSATION_ID, reader, storage); + + expect(storage.appendEvents).toHaveBeenCalledTimes(2); + const firstCall = storage.appendEvents.mock.calls[0]; + expect(firstCall[0]).toBe(CONVERSATION_ID); + const events = firstCall[1] as SpotterVizEvent[]; + expect(events).toHaveLength(1); + expect(events[0].event_type).toBe("text.delta"); + expect(events[0].data).toEqual({ text: "Hello" }); + expect(events[0].message_id).toBe("m-1"); + expect(events[0].idx).toBe(0); + expect(events[0].timestamp).toBe("2024-01-01T00:00:00Z"); + expect(firstCall[2]).toBeFalsy(); + + const closingCall = storage.appendEvents.mock.calls[1]; + expect(closingCall[1]).toEqual([]); + expect(closingCall[2]).toBe(true); + }); + + it("parses multiple frames batched in one chunk and appends them in order", async () => { + const reader = makeReader([ + sseFrame("a", { data: { i: 1 } }) + + sseFrame("b", { data: { i: 2 } }) + + sseFrame("c", { data: { i: 3 } }), + ]); + + await processAuroraSseStream(CONVERSATION_ID, reader, storage); + + const events = storage.appendEvents.mock.calls[0][1] as SpotterVizEvent[]; + expect(events.map((e) => e.event_type)).toEqual(["a", "b", "c"]); + }); + + it("buffers partial frames split across chunks until a frame boundary arrives", async () => { + const frame = sseFrame("text.delta", { data: { text: "Hello world" } }); + const split = Math.floor(frame.length / 2); + + const reader = makeReader([frame.slice(0, split), frame.slice(split)]); + + await processAuroraSseStream(CONVERSATION_ID, reader, storage); + + const events = storage.appendEvents.mock.calls[0][1] as SpotterVizEvent[]; + expect(events).toHaveLength(1); + expect(events[0].event_type).toBe("text.delta"); + expect(events[0].data).toEqual({ text: "Hello world" }); + }); + + it("ignores SSE comments and blank frames (heartbeats)", async () => { + const reader = makeReader([ + `: keepalive\n\n${sseFrame("a", { data: { i: 1 } })}\n\n`, + ]); + + await processAuroraSseStream(CONVERSATION_ID, reader, storage); + + const events = storage.appendEvents.mock.calls[0][1] as SpotterVizEvent[]; + expect(events).toHaveLength(1); + expect(events[0].event_type).toBe("a"); + }); + + it("drops frames whose data payload is unparseable JSON but keeps streaming", async () => { + const malformedFrame = "event: bad\ndata: not-json-{}\n\n"; + const goodFrame = sseFrame("good", { data: { ok: true } }); + + const reader = makeReader([malformedFrame + goodFrame]); + + await processAuroraSseStream(CONVERSATION_ID, reader, storage); + + // Only the good frame should have been appended. + expect(storage.appendEvents).toHaveBeenCalledTimes(2); + const events = storage.appendEvents.mock.calls[0][1] as SpotterVizEvent[]; + expect(events).toHaveLength(1); + expect(events[0].event_type).toBe("good"); + }); + + it("falls back to the data payload's event_type when SSE 'event:' header is missing", async () => { + // No `event:` line — only `data:`. event_type should fall back to data.event_type. + const reader = makeReader([ + 'data: {"event_type":"inferred","data":{"x":1}}\n\n', + ]); + + await processAuroraSseStream(CONVERSATION_ID, reader, storage); + + const events = storage.appendEvents.mock.calls[0][1] as SpotterVizEvent[]; + expect(events).toHaveLength(1); + expect(events[0].event_type).toBe("inferred"); + }); + + it("falls back to 'unknown' when neither SSE header nor payload provides event_type", async () => { + const reader = makeReader(['data: {"data":{"x":1}}\n\n']); + + await processAuroraSseStream(CONVERSATION_ID, reader, storage); + + const events = storage.appendEvents.mock.calls[0][1] as SpotterVizEvent[]; + expect(events[0].event_type).toBe("unknown"); + }); + + it("patches AuroraSessionContext.generationNumber on control.action lb_refresh frames", async () => { + const reader = makeReader([ + sseFrame("control.action", { + data: { + action: "lb_refresh", + metadata: { new_gen_number: 42 }, + }, + }), + ]); + + await processAuroraSseStream(CONVERSATION_ID, reader, storage); + + expect(storage.updateMetadata).toHaveBeenCalledTimes(1); + const [convId, patch] = storage.updateMetadata.mock.calls[0] as [ + string, + Partial, + ]; + expect(convId).toBe(CONVERSATION_ID); + // Numeric new_gen_number must be coerced to string to match BACH session shape. + expect(patch.generationNumber).toBe("42"); + }); + + it("does not patch generationNumber for control.action of a different kind", async () => { + const reader = makeReader([ + sseFrame("control.action", { + data: { action: "something_else", metadata: { new_gen_number: 9 } }, + }), + ]); + + await processAuroraSseStream(CONVERSATION_ID, reader, storage); + + expect(storage.updateMetadata).not.toHaveBeenCalled(); + }); + + it("does not patch when lb_refresh metadata lacks new_gen_number", async () => { + const reader = makeReader([ + sseFrame("control.action", { + data: { action: "lb_refresh", metadata: {} }, + }), + ]); + + await processAuroraSseStream(CONVERSATION_ID, reader, storage); + + expect(storage.updateMetadata).not.toHaveBeenCalled(); + }); + + it("swallows errors from updateMetadata so a failing patch does not abort the stream", async () => { + storage.updateMetadata.mockRejectedValueOnce(new Error("patch failed")); + const reader = makeReader([ + sseFrame("control.action", { + data: { action: "lb_refresh", metadata: { new_gen_number: 5 } }, + }) + sseFrame("text.delta", { data: { text: "still flowing" } }), + ]); + + await expect( + processAuroraSseStream(CONVERSATION_ID, reader, storage), + ).resolves.toBeUndefined(); + + // Events still appended despite the patch error. + const events = storage.appendEvents.mock.calls[0][1] as SpotterVizEvent[]; + expect(events.map((e) => e.event_type)).toEqual([ + "control.action", + "text.delta", + ]); + // And the stream-close marker still fires. + const last = storage.appendEvents.mock.calls.at(-1); + expect(last?.[2]).toBe(true); + }); + + it("marks the conversation done with isDone=true on a clean stream close even when no frames were sent", async () => { + const reader = makeReader([]); + + await processAuroraSseStream(CONVERSATION_ID, reader, storage); + + // Only the closing call. + expect(storage.appendEvents).toHaveBeenCalledTimes(1); + expect(storage.appendEvents.mock.calls[0]).toEqual([ + CONVERSATION_ID, + [], + true, + ]); + }); + + it("on a reader read() error, marks the conversation done as a best-effort", async () => { + const reader = makeThrowingReader(new Error("network blew up")); + + await expect( + processAuroraSseStream(CONVERSATION_ID, reader, storage), + ).resolves.toBeUndefined(); + + expect(storage.appendEvents).toHaveBeenCalledWith( + CONVERSATION_ID, + [], + true, + ); + }); + + it("swallows the secondary error if marking-done itself fails after a stream error", async () => { + const reader = makeThrowingReader(new Error("stream error")); + storage.appendEvents.mockRejectedValue(new Error("mark-done failed")); + + await expect( + processAuroraSseStream(CONVERSATION_ID, reader, storage), + ).resolves.toBeUndefined(); + }); + + it("does not append an empty-events batch between frames (only the stream-close marker)", async () => { + // One frame, then a heartbeat-only chunk, then close. + const reader = makeReader([ + sseFrame("a", { data: { i: 1 } }), + ": heartbeat\n\n", + ]); + + await processAuroraSseStream(CONVERSATION_ID, reader, storage); + + // First call has the real event; second is the close (no third empty-events batch). + expect(storage.appendEvents).toHaveBeenCalledTimes(2); + expect( + (storage.appendEvents.mock.calls[0][1] as SpotterVizEvent[]).length, + ).toBe(1); + expect(storage.appendEvents.mock.calls[1]).toEqual([ + CONVERSATION_ID, + [], + true, + ]); + }); +}); diff --git a/test/storage-service/storage-service.spec.ts b/test/storage-service/storage-service.spec.ts index 981954c..6adea78 100644 --- a/test/storage-service/storage-service.spec.ts +++ b/test/storage-service/storage-service.spec.ts @@ -1,4 +1,4 @@ -import { describe, it, expect, vi, beforeEach } from "vitest"; +import { beforeEach, describe, expect, it, vi } from "vitest"; import { StorageServiceClient } from "../../src/storage-service/storage-service"; import type { Message, @@ -180,7 +180,7 @@ describe("StorageServiceClient", () => { await expect( client.appendMessages(CONVERSATION_ID, [textMessage]), - ).rejects.toThrow("Failed to append messages (500)"); + ).rejects.toThrow("Failed to append events (500)"); }); it("includes the error body in the thrown error message", async () => { @@ -245,7 +245,7 @@ describe("StorageServiceClient", () => { client = new StorageServiceClient(namespaceMock, TOKEN_HASH); await expect(client.getNewMessages(CONVERSATION_ID)).rejects.toThrow( - "Failed to get messages (404)", + "Failed to get events (404)", ); }); @@ -259,6 +259,206 @@ describe("StorageServiceClient", () => { }); }); + // ------------------------------------------------------------------------- + // getMetadata + // ------------------------------------------------------------------------- + + describe("getMetadata", () => { + it("sends GET to /storage//metadata", async () => { + namespaceMock = makeNamespaceMock({ foo: "bar" }); + client = new StorageServiceClient(namespaceMock, TOKEN_HASH); + + await client.getMetadata(CONVERSATION_ID); + + const req = lastRequest(); + expect(req.url).toBe( + `https://internal/storage/${CONVERSATION_ID}/metadata`, + ); + expect(req.method).toBe("GET"); + }); + + it("returns the parsed metadata object", async () => { + const metadata = { foo: "bar", count: 7, nested: { a: 1 } }; + namespaceMock = makeNamespaceMock(metadata); + client = new StorageServiceClient(namespaceMock, TOKEN_HASH); + + const result = await client.getMetadata(CONVERSATION_ID); + + expect(result).toEqual(metadata); + }); + + it("throws when the server returns a non-ok status", async () => { + namespaceMock = makeNamespaceMock("Conversation not found", 404); + client = new StorageServiceClient(namespaceMock, TOKEN_HASH); + + await expect(client.getMetadata(CONVERSATION_ID)).rejects.toThrow( + "Failed to get conversation metadata (404)", + ); + }); + + it("includes the error body in the thrown error message", async () => { + namespaceMock = makeNamespaceMock("Conversation not found", 404); + client = new StorageServiceClient(namespaceMock, TOKEN_HASH); + + await expect(client.getMetadata(CONVERSATION_ID)).rejects.toThrow( + "Conversation not found", + ); + }); + }); + + // ------------------------------------------------------------------------- + // updateMetadata + // ------------------------------------------------------------------------- + + describe("updateMetadata", () => { + it("sends PATCH to /storage//metadata with the patch as JSON body", async () => { + const patch = { count: 5, status: "active" }; + namespaceMock = makeNamespaceMock({ count: 5, status: "active" }); + client = new StorageServiceClient(namespaceMock, TOKEN_HASH); + + await client.updateMetadata(CONVERSATION_ID, patch); + + const req = lastRequest(); + expect(req.url).toBe( + `https://internal/storage/${CONVERSATION_ID}/metadata`, + ); + expect(req.method).toBe("PATCH"); + expect(await req.json()).toEqual(patch); + }); + + it("returns the merged metadata from the response", async () => { + const merged = { existing: 1, count: 5 }; + namespaceMock = makeNamespaceMock(merged); + client = new StorageServiceClient(namespaceMock, TOKEN_HASH); + + const result = await client.updateMetadata( + CONVERSATION_ID, + { count: 5 }, + ); + + expect(result).toEqual(merged); + }); + + it("sends Content-Type: application/json", async () => { + await client.updateMetadata(CONVERSATION_ID, { x: 1 }); + expect(lastRequest().headers.get("Content-Type")).toBe( + "application/json", + ); + }); + + it("throws when the server returns a non-ok status", async () => { + namespaceMock = makeNamespaceMock("Conversation not found", 404); + client = new StorageServiceClient(namespaceMock, TOKEN_HASH); + + await expect( + client.updateMetadata(CONVERSATION_ID, { foo: "bar" }), + ).rejects.toThrow("Failed to update conversation metadata (404)"); + }); + }); + + // ------------------------------------------------------------------------- + // appendEvents / getNewEvents — generic SpotterViz path + // ------------------------------------------------------------------------- + + describe("appendEvents (generic)", () => { + interface CustomEvent { + kind: string; + payload: Record; + } + + it("sends POST to /storage//append with events under the 'messages' wire field", async () => { + const events: CustomEvent[] = [ + { kind: "open", payload: { id: 1 } }, + { kind: "close", payload: { reason: "ok" } }, + ]; + + await client.appendEvents(CONVERSATION_ID, events); + + const req = lastRequest(); + expect(req.url).toBe( + `https://internal/storage/${CONVERSATION_ID}/append`, + ); + expect(req.method).toBe("POST"); + const body = (await req.json()) as { + messages: CustomEvent[]; + isDone: boolean; + }; + expect(body.messages).toEqual(events); + expect(body.isDone).toBe(false); + }); + + it("sends isDone=true when specified", async () => { + await client.appendEvents(CONVERSATION_ID, [], true); + + const body = (await lastRequest().json()) as { isDone: boolean }; + expect(body.isDone).toBe(true); + }); + + it("supports empty event arrays (used to mark done-only)", async () => { + await client.appendEvents(CONVERSATION_ID, [], true); + + const body = (await lastRequest().json()) as { + messages: CustomEvent[]; + isDone: boolean; + }; + expect(body.messages).toEqual([]); + expect(body.isDone).toBe(true); + }); + + it("throws when the server returns a non-ok status", async () => { + namespaceMock = makeNamespaceMock("Cannot append", 400); + client = new StorageServiceClient(namespaceMock, TOKEN_HASH); + + await expect( + client.appendEvents(CONVERSATION_ID, [ + { kind: "x", payload: {} }, + ]), + ).rejects.toThrow("Failed to append events (400)"); + }); + }); + + describe("getNewEvents (generic)", () => { + interface CustomEvent { + kind: string; + } + + it("sends GET to /storage//messages", async () => { + namespaceMock = makeNamespaceMock({ messages: [], isDone: false }); + client = new StorageServiceClient(namespaceMock, TOKEN_HASH); + + await client.getNewEvents(CONVERSATION_ID); + + const req = lastRequest(); + expect(req.url).toBe( + `https://internal/storage/${CONVERSATION_ID}/messages`, + ); + expect(req.method).toBe("GET"); + }); + + it("returns the parsed { messages, isDone } payload typed to the caller's T", async () => { + const payload = { + messages: [{ kind: "a" }, { kind: "b" }] as CustomEvent[], + isDone: true, + }; + namespaceMock = makeNamespaceMock(payload); + client = new StorageServiceClient(namespaceMock, TOKEN_HASH); + + const result = await client.getNewEvents(CONVERSATION_ID); + + expect(result).toEqual(payload); + expect(result.messages[0].kind).toBe("a"); + }); + + it("throws when the server returns a non-ok status", async () => { + namespaceMock = makeNamespaceMock("Conversation not found", 404); + client = new StorageServiceClient(namespaceMock, TOKEN_HASH); + + await expect( + client.getNewEvents(CONVERSATION_ID), + ).rejects.toThrow("Failed to get events (404)"); + }); + }); + // ------------------------------------------------------------------------- // DO instance keying — accessTokenHashUrlSafe isolation // ------------------------------------------------------------------------- diff --git a/test/thoughtspot/thoughtspot-client.spec.ts b/test/thoughtspot/thoughtspot-client.spec.ts index dd6d7d7..687cb69 100644 --- a/test/thoughtspot/thoughtspot-client.spec.ts +++ b/test/thoughtspot/thoughtspot-client.spec.ts @@ -1,11 +1,11 @@ -import { describe, it, expect, vi, beforeEach, afterEach } from "vitest"; -import { getThoughtSpotClient } from "../../src/thoughtspot/thoughtspot-client"; import { - createBearerAuthenticationConfig, ThoughtSpotRestApi, + createBearerAuthenticationConfig, } from "@thoughtspot/rest-api-sdk"; import type { ResponseContext } from "@thoughtspot/rest-api-sdk"; +import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; import YAML from "yaml"; +import { getThoughtSpotClient } from "../../src/thoughtspot/thoughtspot-client"; // Mock the ThoughtSpot REST API SDK vi.mock("@thoughtspot/rest-api-sdk", () => ({ @@ -827,6 +827,255 @@ describe("ThoughtSpot Client", () => { }); }); + describe("createBachPinboardSession", () => { + let client: any; + + beforeEach(() => { + client = getThoughtSpotClient(mockInstanceUrl, mockBearerToken) as any; + }); + + it("POSTs to /callosum/v1/bach/pinboard/ with a LOAD_PINBOARD request and a fresh client transaction id", async () => { + const mockResponse = { + status: { statusCode: "OK" }, + pinboardSession: { + transactionId: "server-txn-1", + generationNumber: 7, + }, + }; + + (fetch as any).mockResolvedValue({ + ok: true, + json: vi.fn().mockResolvedValue(mockResponse), + }); + + const result = await client.createBachPinboardSession({ + liveboardId: "lb-abc", + }); + + expect(fetch).toHaveBeenCalledWith( + `${mockInstanceUrl}/callosum/v1/bach/pinboard/`, + expect.objectContaining({ + method: "POST", + headers: expect.objectContaining({ + "Content-Type": "application/json", + Accept: "application/json", + "user-agent": "ThoughtSpot-ts-client", + Authorization: `Bearer ${mockBearerToken}`, + }), + }), + ); + + const body = JSON.parse((fetch as any).mock.calls[0][1].body); + expect(body.pinboardSession.transactionId).toEqual(expect.any(String)); + expect(body.pinboardSession.transactionId.length).toBeGreaterThan(0); + expect(body.pinboardRequests).toEqual([ + { + type: "LOAD_PINBOARD", + loadPinboard: { savedPinboardId: "lb-abc" }, + }, + ]); + + // Server-supplied ids, numeric generationNumber is coerced to string. + expect(result).toEqual({ + transactionId: "server-txn-1", + generationNumber: "7", + }); + }); + + it("keeps generationNumber as-is when the server returns a string", async () => { + (fetch as any).mockResolvedValue({ + ok: true, + json: vi.fn().mockResolvedValue({ + status: { statusCode: "OK" }, + pinboardSession: { + transactionId: "t", + generationNumber: "12", + }, + }), + }); + + const result = await client.createBachPinboardSession({ + liveboardId: "lb-1", + }); + expect(result.generationNumber).toBe("12"); + }); + + it("throws with the body text on a non-ok HTTP response", async () => { + (fetch as any).mockResolvedValue({ + ok: false, + status: 500, + text: vi.fn().mockResolvedValue("server exploded"), + }); + + await expect( + client.createBachPinboardSession({ liveboardId: "lb-1" }), + ).rejects.toThrow( + "createBachPinboardSession failed with status 500: server exploded", + ); + }); + + it("throws when the top-level status is non-OK, including the error message", async () => { + (fetch as any).mockResolvedValue({ + ok: true, + json: vi.fn().mockResolvedValue({ + status: { + statusCode: "ERROR", + errorMessage: "liveboard not found", + }, + }), + }); + + await expect( + client.createBachPinboardSession({ liveboardId: "lb-1" }), + ).rejects.toThrow( + "createBachPinboardSession returned non-OK status ERROR: liveboard not found", + ); + }); + + it("falls back to per-request loadPinboard.status.errorMessage when top-level errorMessage is missing", async () => { + (fetch as any).mockResolvedValue({ + ok: true, + json: vi.fn().mockResolvedValue({ + status: { statusCode: "ERROR" }, + pinboardResponses: [ + { + loadPinboard: { + status: { errorMessage: "permission denied" }, + }, + }, + ], + }), + }); + + await expect( + client.createBachPinboardSession({ liveboardId: "lb-1" }), + ).rejects.toThrow("permission denied"); + }); + + it("throws when the response is missing session ids even with OK status", async () => { + (fetch as any).mockResolvedValue({ + ok: true, + json: vi.fn().mockResolvedValue({ + status: { statusCode: "OK" }, + // pinboardSession missing + }), + }); + + await expect( + client.createBachPinboardSession({ liveboardId: "lb-1" }), + ).rejects.toThrow(/missing session ids/); + }); + + it("propagates network errors from fetch", async () => { + (fetch as any).mockRejectedValue(new Error("offline")); + + await expect( + client.createBachPinboardSession({ liveboardId: "lb-1" }), + ).rejects.toThrow("offline"); + }); + }); + + describe("saveBachPinboard", () => { + let client: any; + + beforeEach(() => { + client = getThoughtSpotClient(mockInstanceUrl, mockBearerToken) as any; + }); + + it("POSTs to /callosum/v1/bach/pinboard/ with a SAVE_PINBOARD request and the supplied session ids", async () => { + (fetch as any).mockResolvedValue({ + ok: true, + json: vi.fn().mockResolvedValue({ status: { statusCode: "OK" } }), + }); + + await client.saveBachPinboard({ + transactionId: "t-1", + generationNumber: "5", + }); + + const body = JSON.parse((fetch as any).mock.calls[0][1].body); + expect(body.pinboardSession).toEqual({ + transactionId: "t-1", + generationNumber: "5", + }); + expect(body.pinboardRequests).toEqual([ + { type: "SAVE_PINBOARD", savePinboard: {} }, + ]); + }); + + it("returns undefined on success", async () => { + (fetch as any).mockResolvedValue({ + ok: true, + json: vi.fn().mockResolvedValue({ status: { statusCode: "OK" } }), + }); + + const result = await client.saveBachPinboard({ + transactionId: "t-1", + generationNumber: "5", + }); + expect(result).toBeUndefined(); + }); + + it("throws with the body text on a non-ok HTTP response", async () => { + (fetch as any).mockResolvedValue({ + ok: false, + status: 503, + text: vi.fn().mockResolvedValue("unavailable"), + }); + + await expect( + client.saveBachPinboard({ + transactionId: "t-1", + generationNumber: "5", + }), + ).rejects.toThrow("saveBachPinboard failed with status 503: unavailable"); + }); + + it("throws when the top-level status is non-OK", async () => { + (fetch as any).mockResolvedValue({ + ok: true, + json: vi.fn().mockResolvedValue({ + status: { + statusCode: "ERROR", + errorMessage: "save rejected", + }, + }), + }); + + await expect( + client.saveBachPinboard({ + transactionId: "t-1", + generationNumber: "5", + }), + ).rejects.toThrow( + "saveBachPinboard returned non-OK status ERROR: save rejected", + ); + }); + + it("falls back to per-request savePinboard.status.errorMessage when present", async () => { + (fetch as any).mockResolvedValue({ + ok: true, + json: vi.fn().mockResolvedValue({ + status: { statusCode: "ERROR" }, + pinboardResponses: [ + { + savePinboard: { + status: { errorMessage: "save permission denied" }, + }, + }, + ], + }), + }); + + await expect( + client.saveBachPinboard({ + transactionId: "t-1", + generationNumber: "5", + }), + ).rejects.toThrow("save permission denied"); + }); + }); + describe("GraphQL Queries", () => { it("should have the correct GraphQL mutation structure for GetUnsavedAnswerTML", () => { // This test ensures the GraphQL query is properly structured diff --git a/test/thoughtspot/thoughtspot-service.spec.ts b/test/thoughtspot/thoughtspot-service.spec.ts index 614bd57..5527bc3 100644 --- a/test/thoughtspot/thoughtspot-service.spec.ts +++ b/test/thoughtspot/thoughtspot-service.spec.ts @@ -1852,4 +1852,108 @@ describe("thoughtspot-service", () => { ]); }); }); + + describe("createEmptyLiveboard", () => { + it("imports a minimal liveboard TML and returns the new liveboard id", async () => { + mockClient.importMetadataTML = vi + .fn() + .mockResolvedValue([{ response: { header: { id_guid: "lb-new-1" } } }]); + + const service = new ThoughtSpotService(mockClient); + const result = await service.createEmptyLiveboard("Q3 Review"); + + expect(mockClient.importMetadataTML).toHaveBeenCalledTimes(1); + const call = mockClient.importMetadataTML.mock.calls[0][0]; + expect(call.import_policy).toBe("ALL_OR_NONE"); + expect(call.create_new).toBe(true); + + const tml = JSON.parse(call.metadata_tmls[0]); + expect(tml).toEqual({ + liveboard: { + name: "Q3 Review", + visualizations: [], + layout: { tiles: [] }, + }, + }); + + expect(result).toEqual({ liveboardId: "lb-new-1" }); + }); + + it("throws when the import response is missing id_guid", async () => { + mockClient.importMetadataTML = vi.fn().mockResolvedValue([{}]); + + const service = new ThoughtSpotService(mockClient); + await expect(service.createEmptyLiveboard("LB")).rejects.toThrow( + /id_guid missing/, + ); + }); + + it("propagates errors from importMetadataTML", async () => { + mockClient.importMetadataTML = vi + .fn() + .mockRejectedValue(new Error("import broke")); + + const service = new ThoughtSpotService(mockClient); + await expect(service.createEmptyLiveboard("LB")).rejects.toThrow( + "import broke", + ); + }); + }); + + describe("createBachPinboardSession", () => { + it("delegates to the client and returns the session ids", async () => { + mockClient.createBachPinboardSession = vi.fn().mockResolvedValue({ + transactionId: "txn-1", + generationNumber: "3", + }); + + const service = new ThoughtSpotService(mockClient); + const result = await service.createBachPinboardSession("lb-1"); + + expect(mockClient.createBachPinboardSession).toHaveBeenCalledWith({ + liveboardId: "lb-1", + }); + expect(result).toEqual({ + transactionId: "txn-1", + generationNumber: "3", + }); + }); + + it("propagates errors from the client", async () => { + mockClient.createBachPinboardSession = vi + .fn() + .mockRejectedValue(new Error("bach create failed")); + + const service = new ThoughtSpotService(mockClient); + await expect(service.createBachPinboardSession("lb-1")).rejects.toThrow( + "bach create failed", + ); + }); + }); + + describe("saveBachPinboard", () => { + it("delegates to the client with the txn id + generation number", async () => { + mockClient.saveBachPinboard = vi.fn().mockResolvedValue(undefined); + + const service = new ThoughtSpotService(mockClient); + const result = await service.saveBachPinboard("txn-1", "9"); + + expect(mockClient.saveBachPinboard).toHaveBeenCalledWith({ + transactionId: "txn-1", + generationNumber: "9", + }); + expect(result).toBeUndefined(); + }); + + it("propagates errors from the client", async () => { + mockClient.saveBachPinboard = vi + .fn() + .mockRejectedValue(new Error("bach save failed")); + + const service = new ThoughtSpotService(mockClient); + await expect(service.saveBachPinboard("txn-1", "9")).rejects.toThrow( + "bach save failed", + ); + }); + }); }); From 2869eaa617f9766f0c3f869020e57cd7b9dc4dec Mon Sep 17 00:00:00 2001 From: Rohit V Date: Tue, 9 Jun 2026 16:51:31 +0530 Subject: [PATCH 3/5] Fixed null handling from aurora upstream --- src/servers/tool-definitions.ts | 12 ++++---- src/spotterviz/types.ts | 12 ++++---- test/spotterviz/spotterviz-service.spec.ts | 35 ++++++++++++++++++++++ 3 files changed, 47 insertions(+), 12 deletions(-) diff --git a/src/servers/tool-definitions.ts b/src/servers/tool-definitions.ts index d1f730b..5f24be3 100644 --- a/src/servers/tool-definitions.ts +++ b/src/servers/tool-definitions.ts @@ -337,12 +337,12 @@ export const SpotterVizUpdateSchema = z.object({ data: z .record(z.string(), z.unknown()) .describe("Raw event payload as emitted by Aurora."), - message_id: z.string().optional(), - idx: z.number().optional(), - timestamp: z.string().optional(), - tool_id: z.string().optional(), - group_id: z.string().optional(), - heading: z.string().optional(), + message_id: z.string().nullish(), + idx: z.number().nullish(), + timestamp: z.string().nullish(), + tool_id: z.string().nullish(), + group_id: z.string().nullish(), + heading: z.string().nullish(), }); export const SpotterVizGetUpdatesOutputSchema = z.object({ diff --git a/src/spotterviz/types.ts b/src/spotterviz/types.ts index 9538f60..b2df229 100644 --- a/src/spotterviz/types.ts +++ b/src/spotterviz/types.ts @@ -48,12 +48,12 @@ export interface CreateSpotterVizSessionResult { export interface SpotterVizEvent extends Record { event_type: string; data: Record; - message_id?: string; - idx?: number; - timestamp?: string; - tool_id?: string; - group_id?: string; - heading?: string; + message_id?: string | null; + idx?: number | null; + timestamp?: string | null; + tool_id?: string | null; + group_id?: string | null; + heading?: string | null; } export interface SubmitSpotterVizQueryParams { diff --git a/test/spotterviz/spotterviz-service.spec.ts b/test/spotterviz/spotterviz-service.spec.ts index 02b8f33..03107f3 100644 --- a/test/spotterviz/spotterviz-service.spec.ts +++ b/test/spotterviz/spotterviz-service.spec.ts @@ -1,4 +1,5 @@ import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; +import { SpotterVizGetUpdatesOutputSchema } from "../../src/servers/tool-definitions"; import type { SpotterVizClient } from "../../src/spotterviz/spotterviz-client"; import { SpotterVizService } from "../../src/spotterviz/spotterviz-service"; import * as sseStreamModule from "../../src/spotterviz/spotterviz-sse-stream"; @@ -287,6 +288,40 @@ describe("SpotterVizService.submitQuery", () => { }); }); +// --------------------------------------------------------------------------- +// Output schema null tolerance +// --------------------------------------------------------------------------- + +describe("SpotterVizGetUpdatesOutputSchema", () => { + it("accepts events where optional metadata fields are explicit null", () => { + // Aurora emits `tool_id: null` (and similar) for events where the field + // doesn't apply — the schema must accept null, not just undefined, or the + // host will reject the tool response. + const sampleResponse = { + updates: [ + { + event_type: "meta.progress", + data: { + stage: "working", + message: null, + card_type: null, + }, + message_id: "msg-1", + idx: 1, + timestamp: "2026-06-09T11:12:19Z", + tool_id: null, + group_id: "grp-1", + heading: "Understanding user's prompt", + }, + ], + is_done: false, + }; + + const result = SpotterVizGetUpdatesOutputSchema.safeParse(sampleResponse); + expect(result.success).toBe(true); + }); +}); + // --------------------------------------------------------------------------- // saveLiveboard // --------------------------------------------------------------------------- From 0f0dc4d1fb55a9ffd5159ccc34551f9055cca0e9 Mon Sep 17 00:00:00 2001 From: Rohit V Date: Tue, 16 Jun 2026 19:07:33 +0530 Subject: [PATCH 4/5] Descriptions updated --- src/servers/tool-definitions.ts | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/src/servers/tool-definitions.ts b/src/servers/tool-definitions.ts index 5f24be3..28c1580 100644 --- a/src/servers/tool-definitions.ts +++ b/src/servers/tool-definitions.ts @@ -472,7 +472,7 @@ export const toolDefinitionsV2 = [ { name: ToolName.CreateAnalysisSession, description: - "Start an analytical session with the Analytics Agent. This is the first step in a three-step workflow: create a session, send a message, then poll for updates. Once created, you can use the returned `analytical_session_id` to send analytical questions via `send_session_message` and retrieve answers via `get_session_updates`. Sessions are conversational, so you can ask follow-up questions in the same session without creating a new one. Using a single analytical session is preferable, because it reuses the same data source selection.", + "Start an analytical session with the Analytics Agent. This is the first step in a three-step workflow: create a session, send a message, then poll for updates. Once created, you can use the returned `analytical_session_id` to send analytical questions via `send_session_message` and retrieve answers via `get_session_updates`. Sessions are conversational, so you can ask follow-up questions in the same session without creating a new one. Using a single analytical session is preferable, because it reuses the same data source selection. When the user's intent is to create a dashboard, use the first message in this session to plan: ask the Analytics Agent what data and metrics are available for the topic, what trends or breakdowns would be most meaningful, and what chart types best fit the data. Use the Agent's response to form a dashboard plan before requesting any specific charts.", inputSchema: z.toJSONSchema(CreateAnalysisSessionInputSchema), outputSchema: z.toJSONSchema(CreateAnalysisSessionOutputSchema), annotations: { @@ -485,7 +485,7 @@ export const toolDefinitionsV2 = [ { name: ToolName.SendSessionMessage, description: - "Send a message to a session with the Analytics Agent. The Agent may take some time to think and generate a response, so the response will not be returned immediately. Instead, you can use the `get_session_updates` tool to query for the latest updates on the session. After the Agent finishes responding (when `get_session_updates` returns `is_done: true`), you can send another message to the same session to ask follow-up questions without creating a new session. Do not send a new message until the Agent has finished responding to the previous message (when `get_session_updates` returns `is_done: true`). If the user wants to create a dashboard, do not send a message with that request; instead use the `create_dashboard` tool.", + "Send a message to a session with the Analytics Agent. The Agent may take some time to think and generate a response, so the response will not be returned immediately. Instead, you can use the `get_session_updates` tool to query for the latest updates on the session. After the Agent finishes responding (when `get_session_updates` returns `is_done: true`), you can send another message to the same session to ask follow-up questions without creating a new session. Do not send a new message until the Agent has finished responding to the previous message (when `get_session_updates` returns `is_done: true`). When the user's intent is to create a dashboard, the first message must be a planning query — ask what data and metrics are available, what key breakdowns or trends exist, and what charts would be most insightful. Only after receiving and synthesising the Agent's planning response should you send follow-up messages to request specific charts. If the user wants to create a dashboard from the charts collected, use the `create_dashboard` tool — do not send that request as a message to the Agent.", inputSchema: z.toJSONSchema(SendSessionMessageInputSchema), outputSchema: z.toJSONSchema(SendSessionMessageOutputSchema), annotations: { @@ -511,7 +511,7 @@ export const toolDefinitionsV2 = [ { name: ToolName.CreateDashboard, description: - "Create a dashboard from a list of answers, allowing the user to revisit the results later. You can use this if the user asks for a dashboard or liveboard, or asks to save the results from the analysis. This can be a useful way to save the results to revisit later, or present them to other users.", + "Create a dashboard from a list of answers, allowing the user to revisit the results later. You can use this if the user asks for a dashboard or liveboard, or asks to save the results from the analysis. This can be a useful way to save the results to revisit later, or present them to other users. After creating the dashboard, always call `spotterviz_create_session` with the returned `liveboard_id` as `existing_liveboard_id` to apply layout and styling improvements via SpotterViz.", inputSchema: z.toJSONSchema(CreateDashboardInputSchema), outputSchema: z.toJSONSchema(CreateDashboardOutputSchema), annotations: { @@ -524,7 +524,7 @@ export const toolDefinitionsV2 = [ { name: ToolName.SpotterVizCreateSession, description: - "Start a SpotterViz (liveboard agent) session against either a brand-new empty liveboard or an existing one. Exactly one of `new_liveboard_name` or `existing_liveboard_id` must be provided. When `new_liveboard_name` is set, an empty liveboard is created first. The returned `spotterviz_session_id` is the identifier for follow-up SpotterViz tools that send messages and stream updates.", + "Start a SpotterViz session. SpotterViz is a layout and styling agent — it does NOT generate new analytical charts or answers. If the user wants a dashboard with data, always use Spotter3 tools first (`create_analysis_session` → `send_session_message` → `get_session_updates` → `create_dashboard`) to generate the charts, then call this tool with `existing_liveboard_id` set to the `liveboard_id` returned by `create_dashboard`. Exactly one of `new_liveboard_name` or `existing_liveboard_id` must be provided. Use `existing_liveboard_id` when you have a liveboard from a prior `create_dashboard` call — SpotterViz will apply layout and styling to those existing charts. Use `new_liveboard_name` only for a blank-canvas scenario where the user explicitly wants no analytical content. The returned `spotterviz_session_id` is the identifier for follow-up SpotterViz tools.", inputSchema: z.toJSONSchema(SpotterVizCreateSessionInputSchema), outputSchema: z.toJSONSchema(SpotterVizCreateSessionOutputSchema), annotations: { @@ -537,7 +537,7 @@ export const toolDefinitionsV2 = [ { name: ToolName.SpotterVizSubmitQuery, description: - "Submit a natural-language message to an existing SpotterViz session. The SpotterViz agent streams its response asynchronously, so this tool returns immediately once streaming starts. Poll `spotterviz_get_updates` to retrieve the response. Do not call this again on the same `spotterviz_session_id` until the previous turn has finished (i.e. `spotterviz_get_updates` returns the turn-done signal); calling it concurrently will be rejected.", + "Submit a styling prompt to an existing SpotterViz session. Keep the prompt short and vague — SpotterViz is capable of making good layout and styling decisions autonomously. A simple prompt like 'organize and beautify this dashboard' is preferred over specifying exact arrangements. Only include specific styling or layout instructions if the user has explicitly asked for them (e.g. the user said 'put the revenue chart first' or 'use a dark theme'). Do NOT send analytical questions or requests to create new charts; SpotterViz cannot generate analytical content. If new analysis is needed, use the Spotter3 tools (`create_analysis_session`, `send_session_message`, `get_session_updates`) instead. The SpotterViz agent streams its response asynchronously, so this tool returns immediately once streaming starts — poll `spotterviz_get_updates` to retrieve the response. Do not call this again on the same `spotterviz_session_id` until the previous turn has finished (i.e. `spotterviz_get_updates` returns the turn-done signal); calling it concurrently will be rejected.", inputSchema: z.toJSONSchema(SpotterVizSubmitQueryInputSchema), outputSchema: z.toJSONSchema(SpotterVizSubmitQueryOutputSchema), annotations: { @@ -550,7 +550,7 @@ export const toolDefinitionsV2 = [ { name: ToolName.SpotterVizGetUpdates, description: - "Get the latest streaming events from a SpotterViz session. Call this after `spotterviz_submit_query` and continue polling until `is_done` is true. The tool waits adaptively for new events (with internal exponential backoff up to 16 s) and returns early as soon as any events arrive or the turn finishes, so back-to-back calls cost no more than a quick poll when activity is high. An empty `updates` list with `is_done: false` simply means the agent is still thinking — call again to keep polling.", + "Get the latest streaming events from a SpotterViz session. Call this after `spotterviz_submit_query` and continue polling until `is_done` is true. When `is_done` is true, immediately call `spotterviz_save_liveboard` — do not skip this step. The tool waits adaptively for new events (with internal exponential backoff up to 16 s) and returns early as soon as any events arrive or the turn finishes, so back-to-back calls cost no more than a quick poll when activity is high. An empty `updates` list with `is_done: false` simply means the agent is still thinking — call again to keep polling.", inputSchema: z.toJSONSchema(SpotterVizGetUpdatesInputSchema), outputSchema: z.toJSONSchema(SpotterVizGetUpdatesOutputSchema), annotations: { @@ -563,7 +563,7 @@ export const toolDefinitionsV2 = [ { name: ToolName.SpotterVizSaveLiveboard, description: - "Persist the current state of the SpotterViz session's liveboard back to ThoughtSpot. Call this when the user has reached a result they want to keep — for example after a series of `spotterviz_submit_query` turns. The session stays active after saving, so further `spotterviz_submit_query` calls on the same session id continue to work. Returns a `liveboard_url` you can surface to the user as a link to the saved liveboard.", + "Persist the current state of the SpotterViz session's liveboard back to ThoughtSpot. Always call this after `spotterviz_get_updates` returns `is_done: true` — do not end the SpotterViz flow without saving. The session stays active after saving, so further `spotterviz_submit_query` calls on the same session id continue to work. Returns a `liveboard_url` you must surface to the user as a direct link to the saved liveboard.", inputSchema: z.toJSONSchema(SpotterVizSaveLiveboardInputSchema), outputSchema: z.toJSONSchema(SpotterVizSaveLiveboardOutputSchema), annotations: { From 5b34fc173e004af0231d84d4fe510946c9ea54e1 Mon Sep 17 00:00:00 2001 From: "cloudflare-workers-and-pages[bot]" <73139402+cloudflare-workers-and-pages[bot]@users.noreply.github.com> Date: Tue, 16 Jun 2026 13:38:55 +0000 Subject: [PATCH 5/5] Update wrangler config name to rifdhan-test-mcp-server --- wrangler.jsonc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/wrangler.jsonc b/wrangler.jsonc index d74a362..0914cd6 100644 --- a/wrangler.jsonc +++ b/wrangler.jsonc @@ -5,7 +5,7 @@ { "keep_vars": true, "$schema": "node_modules/wrangler/config-schema.json", - "name": "thoughtspot-mcp-server", + "name": "rifdhan-test-mcp-server", "main": "src/index.ts", "compatibility_date": "2025-04-17", "compatibility_flags": [