Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 30 additions & 0 deletions src/metrics/runtime/tool-metrics.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,10 @@ export const UPSTREAM_OPERATION_NAMES = {
"send_agent_conversation_message_streaming",
importMetadataTml: "import_metadata_tml",
searchMetadata: "search_metadata",
createBachPinboardSession: "create_bach_pinboard_session",
saveBachPinboard: "save_bach_pinboard",
createAuroraSession: "create_aurora_session",
submitAuroraQuery: "submit_aurora_query",
} as const;

export type UpstreamOperation =
Expand Down Expand Up @@ -130,6 +134,32 @@ export function recordUpstreamCallMetrics(
recorder.histogram(METRIC_NAMES.upstreamDurationMs, durationMs, labels);
}

/**
* Run an upstream call and record its outcome + duration via `recordUpstreamCallMetrics`.
* Errors propagate to the caller — the metric is recorded as `upstream_error` first.
*/
export async function observeUpstreamCall<T>(
recorder: MetricsRecorder | undefined,
operation: UpstreamOperation,
call: () => Promise<T>,
): Promise<T> {
const startedAt = Date.now();
let outcome: MetricOutcome = "success";
try {
return await call();
} catch (error) {
outcome = "upstream_error";
throw error;
} finally {
recordUpstreamCallMetrics(
recorder,
operation,
outcome,
Date.now() - startedAt,
);
}
}

export function recordUpstreamStreamStartedMetric(
recorder: MetricsRecorder | undefined,
operation: UpstreamOperation,
Expand Down
35 changes: 35 additions & 0 deletions src/servers/conversation-storage-server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@ const MESSAGE_KEY_PREFIX = "message-";
const IS_DONE_KEY = "is-done";
const WRITE_BOOKMARK_KEY = "write-bookmark";
const READ_BOOKMARK_KEY = "read-bookmark";
const METADATA_KEY = "metadata";

export type ConversationMetadata = Record<string, unknown>;

/**
* A Durable Object that stores streaming conversation messages and exposes them over HTTP.
Expand All @@ -20,6 +23,8 @@ const READ_BOOKMARK_KEY = "read-bookmark";
* POST /storage/<conversation-id>/initialize —> initializeConversation
* POST /storage/<conversation-id>/append —> appendMessagesAndRestartTtl
* GET /storage/<conversation-id>/messages —> getNewMessagesAndUpdateBookmark
* GET /storage/<conversation-id>/metadata —> getMetadata
* PATCH /storage/<conversation-id>/metadata —> mergeMetadataAndRestartTtl
*/
export class ConversationStorageServerSQLite {
private conversationId = "";
Expand Down Expand Up @@ -56,6 +61,21 @@ export class ConversationStorageServerSQLite {
return Response.json(state);
}

case "GET /metadata": {
const metadata =
await this.state.storage.get<ConversationMetadata>(METADATA_KEY);
if (metadata === undefined) {
return Response.json({ error: "Not found" }, { status: 404 });
}
return Response.json(metadata);
}

case "PATCH /metadata": {
const patch = (await request.json()) as ConversationMetadata;
const merged = await this.mergeMetadataAndRestartTtl(patch);
return Response.json(merged);
}

default:
return new Response("Not Found", { status: 404 });
}
Expand Down Expand Up @@ -86,6 +106,21 @@ export class ConversationStorageServerSQLite {
await this.restartTtl();
}

/*
* Shallow-merge `patch` into the existing metadata object. Top-level keys in `patch`
* overwrite existing keys. And restarts the TTL.
*/
private async mergeMetadataAndRestartTtl(
patch: ConversationMetadata,
): Promise<ConversationMetadata> {
const existing =
(await this.state.storage.get<ConversationMetadata>(METADATA_KEY)) ?? {};
const merged = { ...existing, ...patch };
await this.state.storage.put<ConversationMetadata>(METADATA_KEY, merged);
await this.restartTtl();
return merged;
}

/*
* Append new messages to the conversation, starting at the current state of WRITE_BOOKMARK and
* saving the new state of WRITE_BOOKMARK after. Writes are done un bulk, but batched if there
Expand Down
16 changes: 16 additions & 0 deletions src/servers/mcp-server-base.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ import {
recordToolInvocationMetrics,
} from "../metrics/runtime/tool-metrics";
import { getActiveSpan, withSpan } from "../metrics/tracing/tracing-utils";
import { SpotterVizClient } from "../spotterviz/spotterviz-client";
import { SpotterVizService } from "../spotterviz/spotterviz-service";
import { StorageServiceClient } from "../storage-service/storage-service";
import { getThoughtSpotClient } from "../thoughtspot/thoughtspot-client";
import { ThoughtSpotService } from "../thoughtspot/thoughtspot-service";
Expand Down Expand Up @@ -228,6 +230,20 @@ export abstract class BaseMCPServer extends Server {
);
}

protected async getSpotterVizService(
recorder?: MetricsRecorder,
): Promise<SpotterVizService> {
return new SpotterVizService(
this.getThoughtSpotService(recorder),
await this.getStorageService(),
new SpotterVizClient(
this.ctx.props.instanceUrl,
this.ctx.props.accessToken,
),
recorder,
);
}

protected abstract getToolMetricApiSurface(): ToolMetricApiSurface;

protected getToolMetricApiVersionLabel(): string | undefined {
Expand Down
154 changes: 154 additions & 0 deletions src/servers/mcp-server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,10 @@ import {
GetRelevantQuestionsSchema,
GetSessionUpdatesInputSchema,
SendSessionMessageInputSchema,
SpotterVizCreateSessionInputSchema,
SpotterVizGetUpdatesInputSchema,
SpotterVizSaveLiveboardInputSchema,
SpotterVizSubmitQueryInputSchema,
ToolName,
} from "./tool-definitions";
import {
Expand Down Expand Up @@ -241,6 +245,22 @@ export class MCPServer extends BaseMCPServer {
return this.callCreateDashboard(request, recorder);
}

case ToolName.SpotterVizCreateSession: {
return this.callSpotterVizCreateSession(request, recorder);
}

case ToolName.SpotterVizSubmitQuery: {
return this.callSpotterVizSubmitQuery(request, recorder);
}

case ToolName.SpotterVizGetUpdates: {
return this.callSpotterVizGetUpdates(request, recorder);
}

case ToolName.SpotterVizSaveLiveboard: {
return this.callSpotterVizSaveLiveboard(request, recorder);
}

default:
throw new Error(`Unknown tool: ${name}`);
}
Expand Down Expand Up @@ -605,4 +625,138 @@ Provide this url to the user as a link to view the liveboard in ThoughtSpot.`;
};
return this._sources;
}

@WithSpan("call-spotterviz-save-liveboard")
async callSpotterVizSaveLiveboard(
request: z.infer<typeof CallToolRequestSchema>,
recorder: MetricsRecorder,
) {
const { spotterviz_session_id } = SpotterVizSaveLiveboardInputSchema.parse(
request.params.arguments,
);

try {
const service = await this.getSpotterVizService(recorder);
const { liveboardId, liveboardUrl } = await service.saveLiveboard({
spotterVizSessionId: spotterviz_session_id,
});

return this.createStructuredContentSuccessResponse(
{ liveboard_id: liveboardId, liveboard_url: liveboardUrl },
"SpotterViz liveboard saved successfully",
);
} catch (error) {
console.error("Error saving SpotterViz liveboard:", error);
return this.createErrorResponse(
`Failed to save SpotterViz liveboard: ${(error as Error).message}`,
"SpotterViz save liveboard failed",
);
}
}

@WithSpan("call-spotterviz-get-updates")
async callSpotterVizGetUpdates(
request: z.infer<typeof CallToolRequestSchema>,
recorder: MetricsRecorder,
) {
const { spotterviz_session_id } = SpotterVizGetUpdatesInputSchema.parse(
request.params.arguments,
);

try {
const service = await this.getSpotterVizService(recorder);
const { updates, isDone } = await service.getUpdates({
spotterVizSessionId: spotterviz_session_id,
});

return this.createStructuredContentSuccessResponse(
{ updates, is_done: isDone },
"SpotterViz session updates retrieved successfully",
);
} catch (error) {
console.error("Error getting SpotterViz updates:", error);
return this.createErrorResponse(
`Failed to get SpotterViz updates: ${(error as Error).message}`,
"SpotterViz get updates failed",
);
}
}

@WithSpan("call-spotterviz-submit-query")
async callSpotterVizSubmitQuery(
request: z.infer<typeof CallToolRequestSchema>,
recorder: MetricsRecorder,
) {
const { spotterviz_session_id, message } =
SpotterVizSubmitQueryInputSchema.parse(request.params.arguments);

const storageService = await this.getStorageService();
try {
await storageService.initializeConversation(spotterviz_session_id);
} catch (error) {
console.error(
"Error initializing SpotterViz conversation in storage service:",
error,
);
return this.createErrorResponse(
"The SpotterViz session has an ongoing response to the previous message. Please continue to call `spotterviz_get_updates` until `is_done` is true before sending a followup message.",
`Error submitting SpotterViz query for session ${spotterviz_session_id}: ${error}`,
);
}

try {
const service = await this.getSpotterVizService(recorder);
const { streamPromise } = await service.submitQuery({
spotterVizSessionId: spotterviz_session_id,
message,
});

// Hand the stream-drain off to the Worker runtime so we can return immediately.
// Falls through harmlessly in tests / non-Worker runtimes where waitUntil is absent.
this.ctx.ctx?.waitUntil?.(streamPromise);

return this.createStructuredContentSuccessResponse(
{ success: true },
"SpotterViz query submitted successfully",
);
} catch (error) {
console.error("Error submitting SpotterViz query:", error);
return this.createErrorResponse(
`Failed to submit SpotterViz query: ${(error as Error).message}`,
"SpotterViz submit query failed",
);
}
}

@WithSpan("call-spotterviz-create-session")
async callSpotterVizCreateSession(
request: z.infer<typeof CallToolRequestSchema>,
recorder: MetricsRecorder,
) {
const { new_liveboard_name, existing_liveboard_id } =
SpotterVizCreateSessionInputSchema.parse(request.params.arguments);

try {
const service = await this.getSpotterVizService(recorder);
const result = await service.createSession({
newLiveboardName: new_liveboard_name,
existingLiveboardId: existing_liveboard_id,
});

return this.createStructuredContentSuccessResponse(
{
spotterviz_session_id: result.spotterVizSessionId,
liveboard_id: result.liveboardId,
liveboard_name: result.liveboardName,
},
"SpotterViz session created successfully",
);
} catch (error) {
console.error("Error creating SpotterViz session:", error);
return this.createErrorResponse(
`Failed to create SpotterViz session: ${(error as Error).message}`,
"SpotterViz session create failed",
);
}
}
}
Loading
Loading