Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions src/bearer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,9 @@ async function handleTokenAuth(
accessToken: accessToken,
instanceUrl: validateAndSanitizeUrl(tsHost),
clientName,
// Distinguishes static-token auth ("bearer"/"token") from OAuth. Used to gate
// OAuth-only tools such as `list_orgs`.
authMode: authRouteFamily,
};
const requestedApiVersion = url.searchParams.get("api-version");

Expand Down
3 changes: 3 additions & 0 deletions src/handlers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,9 @@ class Handler {
scope: oauthReqInfo.scope,
props: {
accessToken: token.data.token,
refreshToken: token.data.refreshToken,
tokenCreatedTime: token.data.tokenCreatedTime,
tokenExpiryDuration: token.data.tokenExpiryDuration,
instanceUrl: instanceUrl,
clientName: clientName,
} as Props,
Expand Down
2 changes: 2 additions & 0 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,8 @@ function createMCPRouter(
? normalizeRequestedApiVersionForAnalytics(requestedApiVersion)
: undefined,
apiVersionMode,
// These routers are only mounted for the OAuth endpoints (/mcp, /sse).
authMode: "oauth",
};

// Route to the appropriate serve method
Expand Down
3 changes: 3 additions & 0 deletions src/metrics/runtime/tool-metrics.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@ export const UPSTREAM_OPERATION_NAMES = {
"send_agent_conversation_message_streaming",
importMetadataTml: "import_metadata_tml",
searchMetadata: "search_metadata",
searchOrgs: "search_orgs",
fetchOrgBearerToken: "fetch_org_bearer_token",
getRefreshedToken: "get_refreshed_token",
} as const;

export type UpstreamOperation =
Expand Down
21 changes: 21 additions & 0 deletions src/servers/conversation-storage-server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ const STORAGE_BATCH_SIZE = 127; // Cloudflare DO bulk get/put limit is 128, we u

const MESSAGE_KEY_PREFIX = "message-";
const IS_DONE_KEY = "is-done";
// Storage key for the active org on the per-user active-org DO instance.
const ACTIVE_ORG_KEY = "active-org";
const WRITE_BOOKMARK_KEY = "write-bookmark";
const READ_BOOKMARK_KEY = "read-bookmark";

Expand Down Expand Up @@ -56,6 +58,25 @@ export class ConversationStorageServerSQLite {
return Response.json(state);
}

// Active-org state. Stored on a DO addressed by the user's storage-key
// hash (not a conversation id), so it is shared across all of the
// user's MCP sessions. No TTL: it must persist until the user switches
// again or reauthenticates (a new login yields a new hash).
case "GET /active-org": {
const activeOrgId =
(await this.state.storage.get<string>(ACTIVE_ORG_KEY)) ?? null;
return Response.json({ activeOrgId });
}

case "POST /active-org": {
const body = (await request.json()) as { activeOrgId: string };
await this.state.storage.put<string>(
ACTIVE_ORG_KEY,
body.activeOrgId,
);
return Response.json({ ok: true });
}

default:
return new Response("Not Found", { status: 404 });
}
Expand Down
102 changes: 92 additions & 10 deletions src/servers/mcp-server-base.ts
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,18 @@ export abstract class BaseMCPServer extends Server {
return String(this.sessionInfo.enableSpotterDataSourceDiscovery) === "true";
}

/**
* Whether Orgs are enabled on this cluster (from session info). Fails closed:
* if session info is unavailable or the flag is absent, returns false so the
* org tools stay hidden.
*/
protected isOrgsEnabled(): boolean {
if (!this.sessionInfo) {
return false;
}
return this.sessionInfo.orgsEnabled === true;
}

/**
* Initialize span with common attributes (user_guid and instance_url)
*/
Expand Down Expand Up @@ -187,34 +199,90 @@ export abstract class BaseMCPServer extends Server {
};
}

protected async getStorageService(): Promise<StorageServiceClient> {
const accessToken = this.ctx.props.accessToken;
if (!accessToken || accessToken.length === 0) {
throw new Error("Access token is required to use Storage Service");
/**
* Stable per-login hash used to namespace this user's durable storage (both
* conversation buffers and active-org state), keeping users isolated.
*
* Keyed on the refresh token when present (OAuth): it is stable across the
* access token's 24h rotation and only changes on full reauthentication, so
* storage survives token refresh and resets on reauth. Falls back to the
* access token for static bearer/token connections, which have no refresh
* token (their token is long-lived).
*/
protected async getStorageKeyHash(): Promise<string> {
const keyToken = this.ctx.props.refreshToken ?? this.ctx.props.accessToken;
if (!keyToken || keyToken.length === 0) {
throw new Error("A token is required to derive the storage key");
}
const encodedAccessToken = new TextEncoder().encode(accessToken);
const hashBuffer = await crypto.subtle.digest(
"SHA-256",
encodedAccessToken,
);
const hashUrlSafe = Buffer.from(new Uint8Array(hashBuffer)).toString(
"base64url",
new TextEncoder().encode(keyToken),
);
return Buffer.from(new Uint8Array(hashBuffer)).toString("base64url");
}

protected async getStorageService(): Promise<StorageServiceClient> {
const hashUrlSafe = await this.getStorageKeyHash();
return new StorageServiceClient(
this.ctx.env
.CONVERSATION_STORAGE_OBJECT as unknown as DurableObjectNamespace,
hashUrlSafe,
);
}

/**
* The org currently active for this session, if any. When set, all
* ThoughtSpot calls are scoped to this org via the x-thoughtspot-orgs header.
* Subclasses override this to expose their per-session org state. Defaults to
* undefined (the user's default org, as resolved by the cluster).
*/
protected getActiveOrgId(): string | undefined {
return undefined;
}

/**
* The bearer token to use for ThoughtSpot calls. Defaults to the token from
* the session. Subclasses override this to return an org-scoped bearer token
* when an org has been selected.
*/
protected getActiveBearerToken(): string {
return this.ctx.props.accessToken;
}

/**
* Build a ThoughtSpot service bound to an explicit bearer token and org,
* bypassing the active-org/token resolution. Used for org-token minting,
* which must authenticate with a specific token.
*/
protected getThoughtSpotServiceWithToken(
bearerToken: string,
orgId?: string,
recorder?: MetricsRecorder,
analyticsContextOverride?: MetricAnalyticsContext,
) {
return new ThoughtSpotService(
getThoughtSpotClient(this.ctx.props.instanceUrl, bearerToken, orgId),
{
recorder,
metricsEnv: this.ctx.env as unknown as Record<string, unknown>,
waitUntil: this.getMetricsWaitUntil(),
analyticsContext: this.mergeMetricAnalyticsContext(
analyticsContextOverride,
),
eventIdentity: this.getMetricEventIdentity(),
},
);
}

protected getThoughtSpotService(
recorder?: MetricsRecorder,
analyticsContextOverride?: MetricAnalyticsContext,
) {
return new ThoughtSpotService(
getThoughtSpotClient(
this.ctx.props.instanceUrl,
this.ctx.props.accessToken,
this.getActiveBearerToken(),
this.getActiveOrgId(),
),
{
recorder,
Expand Down Expand Up @@ -439,8 +507,22 @@ export abstract class BaseMCPServer extends Server {
});
},
);

// Subclass post-initialization hook (runs after sessionInfo is available
// and handlers are registered). Best-effort: failures must not break the
// connection.
try {
await this.postInit();
} catch (error) {
console.error("postInit failed:", error);
}
}

/**
* Optional hook for subclasses to run setup after init(). Default no-op.
*/
protected async postInit(): Promise<void> {}

async addTracker(tracker: Tracker) {
this.trackers.add(tracker);
}
Expand Down
Loading