Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 14 additions & 3 deletions packages/appkit/src/context/execution-context.ts
Original file line number Diff line number Diff line change
Expand Up @@ -83,9 +83,20 @@ export function getWorkspaceId(): Promise<string> {
}

/**
* Check if currently running in a user context.
* Check if currently running in a user context. Sugar for
* `getCurrentUserContext() !== null` β€” prefer {@link getCurrentUserContext}
* when you also need the value.
*/
export function isInUserContext(): boolean {
const ctx = executionContextStorage.getStore();
return ctx !== undefined;
return getCurrentUserContext() !== null;
}

/**
* Returns the active {@link UserContext} when inside a
* `runInUserContext` scope (set by `plugin.asUser(req).method(...)`),
* or `null`. Use to forward user identity to a downstream layer
* (e.g. spawning a durable task that should run as the caller).
*/
export function getCurrentUserContext(): UserContext | null {
return executionContextStorage.getStore() ?? null;
}
1 change: 1 addition & 0 deletions packages/appkit/src/context/index.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
export {
getCurrentUserContext,
getCurrentUserId,
getExecutionContext,
getWarehouseId,
Expand Down
24 changes: 24 additions & 0 deletions packages/appkit/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,30 @@ export {
ResourceRegistry,
ResourceType,
} from "./registry";
// Durable execution. `TaskManager` is exported type-only: it's
// constructed by `createApp` and addressed via `this.task` inside
// plugins; internal statics must not leak to consumers.
export {
type ExecuteTaskSettings,
type ResumeOptions,
type SseEvent,
type StopOptions,
type StreamEvent,
type SubmitOptions,
setupSseHeaders,
step,
TASK_IDEMPOTENCY_HEADER,
type Task,
type TaskConfig,
type TaskContext,
type TaskDefinition,
type TaskEvent,
type TaskHandle,
type TaskManager,
type TaskRef,
type TypedTaskContext,
writeSseFrame,
} from "./tasks";
// Telemetry (for advanced custom telemetry)
export {
type Counter,
Expand Down
71 changes: 65 additions & 6 deletions packages/appkit/src/plugin/plugin.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,12 @@ import type { PluginContext } from "../core/plugin-context";
import { AppKitError, AuthenticationError } from "../errors";
import { createLogger } from "../logging/logger";
import { StreamManager } from "../stream";
import { TaskManager } from "../tasks";
import {
type ExecuteTaskSettings,
executeTask as executeTaskImpl,
TaskManager,
type TaskRef,
} from "../tasks";
import {
type ITelemetry,
normalizeTelemetryOptions,
Expand Down Expand Up @@ -74,11 +79,14 @@ function hasHttpStatusCode(

/**
* Methods that should not be proxied by asUser().
* These are lifecycle/internal methods that don't make sense
* to execute in a user context.
* Lifecycle/internal methods that don't make sense in a user context.
*
* Note: `executeTask` is deliberately NOT excluded β€” it MUST run inside
* the proxy's `runInUserContext` so `getCurrentUserContext()` forwards
* the OBO context to the engine. Excluding it would silently downgrade
* OBO calls to SP.
*/
const EXCLUDED_FROM_PROXY = new Set([
// Lifecycle methods
"setup",
"shutdown",
"attachContext",
Expand All @@ -87,9 +95,8 @@ const EXCLUDED_FROM_PROXY = new Set([
"getSkipBodyParsingPaths",
"abortActiveOperations",
"clientConfig",
// asUser itself - prevent chaining like .asUser().asUser()
// Prevent chained .asUser().asUser().
"asUser",
// Internal methods
"constructor",
]);

Expand Down Expand Up @@ -542,6 +549,58 @@ export abstract class Plugin<
);
}

/**
* Bridges a registered durable task to an SSE response.
*
* Submits via `this.task.start(...)`, subscribes, writes each event
* as `id: <seq>\nevent: <name>\ndata: <json>`. Supports
* `Last-Event-ID` reconnect from the WAL.
*
* Identity comes from the active `runInUserContext` scope. For OBO,
* call through the proxy: `this.asUser(req).executeTask(...)`.
*
* Unlike `execute()` / `executeStream()`, this does not accept
* `retry` / `cache` / `timeout` β€” the task engine handles them
* natively.
*
* @throws if the app was created with `createApp({ task: false })`.
*
* @example
* ```ts
* // SP
* await this.executeTask(res, "agent-loop", req.body);
*
* // OBO, no cancel on reconnect
* await this.asUser(req).executeTask(res, "agent-loop", req.body, {
* cancelOnDisconnect: false,
* });
* ```
*/
protected async executeTask<
TInput = unknown,
TResult = unknown,
TEvents extends Record<string, unknown> = Record<string, unknown>,
>(
res: express.Response,
task: string | TaskRef<TInput, TResult, TEvents>,
input: TInput,
settings?: ExecuteTaskSettings,
): Promise<void> {
if (!this.task) {
throw new Error(
"executeTask() requires durable execution, but the app was created with createApp({ task: false }).",
);
}
const taskName = typeof task === "string" ? task : task.name;
return executeTaskImpl(
{ manager: this.task, telemetry: this.telemetry, pluginName: this.name },
res,
taskName,
input,
settings,
);
}

/**
* Execute a function with the plugin's interceptor chain.
*
Expand Down
Loading