diff --git a/.gitignore b/.gitignore index 5d417368..b8c83e84 100644 --- a/.gitignore +++ b/.gitignore @@ -11,3 +11,5 @@ coverage .turbo .databricks + +.appkit/ \ No newline at end of file diff --git a/apps/dev-playground/.gitignore b/apps/dev-playground/.gitignore index 1f4745f5..76c8ae98 100644 --- a/apps/dev-playground/.gitignore +++ b/apps/dev-playground/.gitignore @@ -3,4 +3,6 @@ test-results/ playwright-report/ # Auto-generated types (endpoint-specific, varies per developer) -shared/appkit-types/serving.d.ts \ No newline at end of file +shared/appkit-types/serving.d.ts + +.appkit/ diff --git a/knip.json b/knip.json index 4fb87b38..ee7482e0 100644 --- a/knip.json +++ b/knip.json @@ -15,6 +15,7 @@ "**/*.generated.ts", "**/*.example.tsx", "**/*.css", + "packages/appkit/vendor/**", "packages/appkit/src/plugins/vector-search/**", "packages/appkit/src/plugin/index.ts", "packages/appkit/src/plugin/to-plugin.ts", diff --git a/packages/appkit/src/core/appkit.ts b/packages/appkit/src/core/appkit.ts index 28fa5605..0e599d23 100644 --- a/packages/appkit/src/core/appkit.ts +++ b/packages/appkit/src/core/appkit.ts @@ -15,7 +15,9 @@ import { TelemetryReporter, } from "../internal-telemetry"; import { ResourceRegistry, ResourceType } from "../registry"; +import type { TaskConfig } from "../tasks"; import type { TelemetryConfig } from "../telemetry"; +import { registerGracefulShutdownHandlers } from "./graceful-shutdown"; import { isToolProvider, PluginContext } from "./plugin-context"; import { type ServiceManager, startCoreServices } from "./service-manager"; @@ -193,6 +195,7 @@ export class AppKit { plugins?: T; telemetry?: TelemetryConfig; cache?: CacheConfig; + task?: TaskConfig | false; client?: WorkspaceClient; onPluginsReady?: (appkit: PluginMap) => void | Promise; disableInternalTelemetry?: boolean; @@ -201,6 +204,7 @@ export class AppKit { const services = await startCoreServices({ telemetry: config?.telemetry, cache: config?.cache, + task: config?.task, }); try { @@ -241,11 +245,11 @@ export class AppKit { const serverPlugin = instance.#pluginInstances.server; if (serverPlugin && typeof (serverPlugin as any).start === "function") { - await (serverPlugin as any).start({ - shutdownCoreServices: () => services.stop(), - }); + await (serverPlugin as any).start(); } + registerGracefulShutdownHandlers(serverPlugin, services); + return handle; } catch (error) { await services.stop(); @@ -322,6 +326,7 @@ export async function createApp< plugins?: T; telemetry?: TelemetryConfig; cache?: CacheConfig; + task?: TaskConfig | false; client?: WorkspaceClient; onPluginsReady?: (appkit: PluginMap) => void | Promise; disableInternalTelemetry?: boolean; diff --git a/packages/appkit/src/core/graceful-shutdown.ts b/packages/appkit/src/core/graceful-shutdown.ts new file mode 100644 index 00000000..fc3588c4 --- /dev/null +++ b/packages/appkit/src/core/graceful-shutdown.ts @@ -0,0 +1,63 @@ +import type { BasePlugin } from "shared"; +import { createLogger } from "../logging/logger"; +import type { ServiceManager } from "./service-manager"; + +const log = createLogger("appkit.shutdown"); + +const FORCE_SHUTDOWN_MS = 15_000; + +type GracefulServer = BasePlugin & { + gracefulClose?: () => Promise; +}; + +/** + * SIGTERM/SIGINT: drain HTTP (if the server plugin exposes it), then stop core + * services. Force-exits after {@link FORCE_SHUTDOWN_MS} if stuck; repeated + * signals are ignored while a run is in flight. + */ +export function registerGracefulShutdownHandlers( + serverPlugin: BasePlugin | undefined, + services: ServiceManager, +): void { + let running = false; + + const shutdown = async () => { + if (running) return; + running = true; + + const forceExit = setTimeout(() => { + log.error( + "Shutdown timed out after %dms; forcing exit", + FORCE_SHUTDOWN_MS, + ); + process.exit(1); + }, FORCE_SHUTDOWN_MS); + forceExit.unref(); + + try { + await maybeCloseServer(serverPlugin as GracefulServer | undefined); + await services.stop(); + clearTimeout(forceExit); + process.exit(0); + } catch (err) { + log.error("Shutdown failed: %O", err); + clearTimeout(forceExit); + process.exit(1); + } + }; + + const onSignal = () => { + void shutdown(); + }; + + process.on("SIGTERM", onSignal); + process.on("SIGINT", onSignal); +} + +async function maybeCloseServer( + serverPlugin: GracefulServer | undefined, +): Promise { + const close = serverPlugin?.gracefulClose; + if (typeof close !== "function") return; + await close.call(serverPlugin); +} diff --git a/packages/appkit/src/core/service-manager.ts b/packages/appkit/src/core/service-manager.ts index 05b6a078..319413fb 100644 --- a/packages/appkit/src/core/service-manager.ts +++ b/packages/appkit/src/core/service-manager.ts @@ -1,6 +1,7 @@ import type { CacheConfig } from "shared"; import { CacheManager } from "../cache"; import { createLogger } from "../logging"; +import { type TaskConfig, TaskManager } from "../tasks"; import { type TelemetryConfig, TelemetryManager } from "../telemetry"; const logger = createLogger("services"); @@ -55,11 +56,13 @@ export class ServiceManager { export async function startCoreServices(config: { telemetry?: TelemetryConfig; cache?: CacheConfig; + task?: TaskConfig | false; }): Promise { const services = new ServiceManager(); try { services.add("telemetry", await TelemetryManager.boot(config.telemetry)); services.add("cache", await CacheManager.boot(config.cache)); + services.add("task", await TaskManager.boot(config.task)); return services; } catch (error) { await services.stop(); diff --git a/packages/appkit/src/core/tests/graceful-shutdown.test.ts b/packages/appkit/src/core/tests/graceful-shutdown.test.ts new file mode 100644 index 00000000..5a45bfeb --- /dev/null +++ b/packages/appkit/src/core/tests/graceful-shutdown.test.ts @@ -0,0 +1,187 @@ +import { afterEach, beforeEach, describe, expect, test, vi } from "vitest"; + +const { mockLogger } = vi.hoisted(() => ({ + mockLogger: { + info: vi.fn(), + warn: vi.fn(), + debug: vi.fn(), + error: vi.fn(), + event: vi.fn(), + }, +})); + +vi.mock("../../logging/logger", () => ({ + createLogger: () => mockLogger, +})); + +import { registerGracefulShutdownHandlers } from "../graceful-shutdown"; + +type SignalListener = NodeJS.SignalsListener; + +function makeServiceManager() { + return { + add: vi.fn(), + get: vi.fn().mockReturnValue(null), + stop: vi.fn(async () => {}), + } as unknown as import("../service-manager").ServiceManager & { + stop: ReturnType; + }; +} + +/** + * Snapshot SIGTERM/SIGINT listeners before each test and surgically remove + * anything that registered during the test. Avoids stepping on vitest's + * own signal handlers. + */ +function captureNewSignalListeners(): () => void { + const before = { + SIGTERM: new Set(process.listeners("SIGTERM")), + SIGINT: new Set(process.listeners("SIGINT")), + }; + return () => { + for (const signal of ["SIGTERM", "SIGINT"] as const) { + for (const listener of process.listeners(signal)) { + if (!before[signal].has(listener)) { + process.removeListener(signal, listener as SignalListener); + } + } + } + }; +} + +function getRegisteredHandler(signal: "SIGTERM" | "SIGINT"): SignalListener { + const listeners = process.listeners(signal); + if (listeners.length === 0) { + throw new Error(`No ${signal} listener registered`); + } + return listeners[listeners.length - 1] as SignalListener; +} + +describe("registerGracefulShutdownHandlers", () => { + let exitSpy: ReturnType; + let cleanupListeners: () => void; + + beforeEach(() => { + mockLogger.error.mockClear(); + cleanupListeners = captureNewSignalListeners(); + exitSpy = vi.fn(); + vi.spyOn(process, "exit").mockImplementation(((code?: number) => { + exitSpy(code); + return undefined as never; + }) as never); + }); + + afterEach(() => { + cleanupListeners(); + vi.useRealTimers(); + vi.restoreAllMocks(); + }); + + test("registers both SIGTERM and SIGINT handlers", () => { + const services = makeServiceManager(); + registerGracefulShutdownHandlers(undefined, services); + + expect(() => getRegisteredHandler("SIGTERM")).not.toThrow(); + expect(() => getRegisteredHandler("SIGINT")).not.toThrow(); + }); + + test("drains server, stops services in order, exits with 0", async () => { + const gracefulClose = vi.fn(async () => {}); + const serverPlugin = { name: "server", gracefulClose } as never; + const services = makeServiceManager(); + + registerGracefulShutdownHandlers(serverPlugin, services); + getRegisteredHandler("SIGTERM")("SIGTERM"); + + // Yield twice: once for the async shutdown function to start, once for + // its awaits (close + stop) to resolve through their microtask queues. + await new Promise((r) => setImmediate(r)); + await new Promise((r) => setImmediate(r)); + + expect(gracefulClose).toHaveBeenCalledTimes(1); + expect(services.stop).toHaveBeenCalledTimes(1); + expect(gracefulClose.mock.invocationCallOrder[0]).toBeLessThan( + (services.stop as ReturnType).mock.invocationCallOrder[0], + ); + expect(exitSpy).toHaveBeenCalledWith(0); + }); + + test("server plugin without gracefulClose is skipped, services still stop", async () => { + const services = makeServiceManager(); + const pluginWithoutClose = { name: "noop" } as never; + + registerGracefulShutdownHandlers(pluginWithoutClose, services); + getRegisteredHandler("SIGTERM")("SIGTERM"); + + await new Promise((r) => setImmediate(r)); + await new Promise((r) => setImmediate(r)); + + expect(services.stop).toHaveBeenCalledTimes(1); + expect(exitSpy).toHaveBeenCalledWith(0); + }); + + test("repeated signals during shutdown are coalesced", async () => { + let resolveClose!: () => void; + const gracefulClose = vi.fn( + () => + new Promise((r) => { + resolveClose = r; + }), + ); + const services = makeServiceManager(); + const serverPlugin = { name: "server", gracefulClose } as never; + + registerGracefulShutdownHandlers(serverPlugin, services); + const handler = getRegisteredHandler("SIGTERM"); + + handler("SIGTERM"); + handler("SIGTERM"); + handler("SIGTERM"); + await new Promise((r) => setImmediate(r)); + + expect(gracefulClose).toHaveBeenCalledTimes(1); + + resolveClose(); + await new Promise((r) => setImmediate(r)); + await new Promise((r) => setImmediate(r)); + expect(services.stop).toHaveBeenCalledTimes(1); + }); + + test("services.stop() failure logs and exits with 1", async () => { + const services = makeServiceManager(); + (services.stop as ReturnType).mockRejectedValueOnce( + new Error("boom"), + ); + + registerGracefulShutdownHandlers(undefined, services); + getRegisteredHandler("SIGTERM")("SIGTERM"); + + await new Promise((r) => setImmediate(r)); + await new Promise((r) => setImmediate(r)); + + expect(mockLogger.error).toHaveBeenCalledWith( + "Shutdown failed: %O", + expect.any(Error), + ); + expect(exitSpy).toHaveBeenCalledWith(1); + }); + + test("force-exits when shutdown hangs past FORCE_SHUTDOWN_MS", async () => { + vi.useFakeTimers(); + const gracefulClose = vi.fn(() => new Promise(() => {})); + const services = makeServiceManager(); + const serverPlugin = { name: "server", gracefulClose } as never; + + registerGracefulShutdownHandlers(serverPlugin, services); + getRegisteredHandler("SIGTERM")("SIGTERM"); + + await Promise.resolve(); + vi.advanceTimersByTime(15_000); + + expect(exitSpy).toHaveBeenCalledWith(1); + expect(mockLogger.error).toHaveBeenCalledWith( + expect.stringContaining("Shutdown timed out"), + 15_000, + ); + }); +}); diff --git a/packages/appkit/src/index.ts b/packages/appkit/src/index.ts index 00fd6ff8..408a4443 100644 --- a/packages/appkit/src/index.ts +++ b/packages/appkit/src/index.ts @@ -14,6 +14,15 @@ export type { StreamExecutionSettings, } from "shared"; export { isSQLTypeMarker, sql } from "shared"; +export type { + ResumeOptions, + StopOptions, + StreamEvent, + SubmitOptions, + Task, + TaskEvent, + TaskHandle, +} from "../vendor/taskflow/taskflow.js"; export { CacheManager } from "./cache"; export type { JobsConnectorConfig } from "./connectors/jobs"; export type { @@ -96,6 +105,8 @@ export { ResourceRegistry, ResourceType, } from "./registry"; +export type { TaskConfig } from "./tasks"; +export { TaskManager } from "./tasks"; // Telemetry (for advanced custom telemetry) export { type Counter, diff --git a/packages/appkit/src/plugin/plugin.ts b/packages/appkit/src/plugin/plugin.ts index d8801aba..8f13c91b 100644 --- a/packages/appkit/src/plugin/plugin.ts +++ b/packages/appkit/src/plugin/plugin.ts @@ -24,6 +24,7 @@ import type { PluginContext } from "../core/plugin-context"; import { AppKitError, AuthenticationError } from "../errors"; import { createLogger } from "../logging/logger"; import { StreamManager } from "../stream"; +import { TaskManager } from "../tasks"; import { type ITelemetry, normalizeTelemetryOptions, @@ -187,6 +188,8 @@ export abstract class Plugin< protected telemetry!: ITelemetry; protected context?: PluginContext; + protected task: TaskManager | null = null; + /** Registered endpoints for this plugin */ private registeredEndpoints: PluginEndpointMap = {}; @@ -232,6 +235,7 @@ export abstract class Plugin< this.name, this.config.telemetry, ); + this.task = TaskManager.getInstanceSync(); this.isReady = true; } @@ -242,16 +246,19 @@ export abstract class Plugin< telemetryConfig?: BasePluginConfig["telemetry"]; } = {}, ): void { + const { services, telemetryConfig, context } = deps; this.cache = - deps.services?.get("cache") ?? + services?.get("cache") ?? this.cache ?? CacheManager.getInstanceSync(); this.telemetry = TelemetryManager.getProvider( this.name, - deps.telemetryConfig ?? this.config.telemetry, + telemetryConfig ?? this.config.telemetry, ); - if (deps.context !== undefined) { - this.context = deps.context as PluginContext; + this.task = + services?.get("task") ?? TaskManager.getInstanceSync(); + if (context !== undefined) { + this.context = context as PluginContext; } this.isReady = true; } diff --git a/packages/appkit/src/plugins/server/index.ts b/packages/appkit/src/plugins/server/index.ts index e66abf5a..f2a523e3 100644 --- a/packages/appkit/src/plugins/server/index.ts +++ b/packages/appkit/src/plugins/server/index.ts @@ -102,12 +102,8 @@ export class ServerPlugin extends Plugin { } /** - * Start the server. - * - * This method starts the server and sets up the frontend. - * It also sets up the remote tunneling if enabled. - * - * @returns The express application. + * Starts the server and registers the frontend (and, in dev, the + * remote tunnel). */ async start(): Promise { this.serverApplication.use(requestMetricsMiddleware); @@ -121,10 +117,9 @@ export class ServerPlugin extends Plugin { // more headroom. limit: this.config.bodyLimit ?? "1mb", type: (req) => { - // Skip JSON parsing for routes that declared skipBodyParsing - // (e.g. file uploads where the raw body must flow through). - // rawBodyPaths is populated by extendRoutes() below; the type - // callback runs per-request so the set is already filled. + // Skip JSON parsing for routes that opted out (e.g. file + // uploads). `rawBodyPaths` is populated by `extendRoutes()` + // before any request hits this callback. const urlPath = req.url?.split("?")[0]; if (urlPath && this.rawBodyPaths.has(urlPath)) return false; const ct = req.headers["content-type"] ?? ""; @@ -160,9 +155,6 @@ export class ServerPlugin extends Plugin { // attach server to remote tunnel controller this.remoteTunnelController.setServer(server); - process.on("SIGTERM", () => this._gracefulShutdown()); - process.on("SIGINT", () => this._gracefulShutdown()); - if (process.env.NODE_ENV === "development") { const allRoutes = getRoutes(this.serverApplication._router.stack); printRoutes(allRoutes); @@ -170,6 +162,47 @@ export class ServerPlugin extends Plugin { return this.serverApplication; } + /** + * Stops Vite/tunnel/reporter, aborts in-flight plugin work, then closes HTTP. + */ + async gracefulClose(): Promise { + logger.info("Closing server..."); + await this.closeDevStack(); + this.abortAllPluginOperations(); + await this.closeHttpServer(); + } + + private async closeDevStack(): Promise { + if (this.viteDevServer) await this.viteDevServer.close(); + if (this.remoteTunnelController) this.remoteTunnelController.cleanup(); + TelemetryReporter.getInstance()?.stop(); + } + + private abortAllPluginOperations(): void { + const plugins = this.context?.getPlugins(); + if (!plugins) return; + for (const plugin of plugins.values()) { + if (!plugin.abortActiveOperations) continue; + try { + plugin.abortActiveOperations(); + } catch (err) { + logger.error( + "Error aborting operations for plugin %s: %O", + plugin.name, + err, + ); + } + } + } + + private async closeHttpServer(): Promise { + if (!this.server) return; + await new Promise((resolve) => { + this.server?.close(() => resolve()); + }); + logger.debug("Server closed gracefully"); + } + /** * Get the low level node.js http server instance. * @@ -398,54 +431,6 @@ export class ServerPlugin extends Plugin { } } - private async _gracefulShutdown() { - logger.info("Starting graceful shutdown..."); - - if (this.viteDevServer) { - await this.viteDevServer.close(); - } - - if (this.remoteTunnelController) { - this.remoteTunnelController.cleanup(); - } - - TelemetryReporter.getInstance()?.stop(); - - // 1. abort active operations from plugins - const shutdownPlugins = this.context?.getPlugins(); - if (shutdownPlugins) { - for (const plugin of shutdownPlugins.values()) { - if (plugin.abortActiveOperations) { - try { - plugin.abortActiveOperations(); - } catch (err) { - logger.error( - "Error aborting operations for plugin %s: %O", - plugin.name, - err, - ); - } - } - } - } - - // 2. close the server - if (this.server) { - this.server.close(() => { - logger.debug("Server closed gracefully"); - process.exit(0); - }); - - // 3. timeout to force shutdown after 15 seconds - setTimeout(() => { - logger.debug("Force shutdown after timeout"); - process.exit(1); - }, 15000); - } else { - process.exit(0); - } - } - /** * Returns the public exports for the server plugin. * Exposes server management methods. diff --git a/packages/appkit/src/plugins/server/tests/server.test.ts b/packages/appkit/src/plugins/server/tests/server.test.ts index bbc96172..93068ac7 100644 --- a/packages/appkit/src/plugins/server/tests/server.test.ts +++ b/packages/appkit/src/plugins/server/tests/server.test.ts @@ -694,13 +694,9 @@ describe("ServerPlugin", () => { }); }); - describe("_gracefulShutdown", () => { + describe("gracefulClose", () => { test("aborts plugin operations (with error isolation) and closes server", async () => { - vi.useFakeTimers(); mockLoggerError.mockClear(); - const exitSpy = vi - .spyOn(process, "exit") - .mockImplementation(((_code?: number) => undefined) as any); const plugin = new ServerPlugin({ context: createContextWithPlugins({ @@ -717,18 +713,47 @@ describe("ServerPlugin", () => { }), } as any); - // pretend started (plugin as any).server = mockHttpServer; - await (plugin as any)._gracefulShutdown(); - vi.runAllTimers(); + await plugin.gracefulClose(); expect(mockLoggerError).toHaveBeenCalled(); expect(mockHttpServer.close).toHaveBeenCalled(); - expect(exitSpy).toHaveBeenCalled(); + }); + + test("resolves only after server.close callback fires", async () => { + let closeCb: (() => void) | undefined; + const closeSpy = vi.fn((cb: any) => { + closeCb = cb; + }); + + const plugin = new ServerPlugin({ + context: createContextWithPlugins({}), + } as any); + + (plugin as any).server = { ...mockHttpServer, close: closeSpy }; + + let resolved = false; + const done = plugin.gracefulClose().then(() => { + resolved = true; + }); + + // Wait a microtask; the server hasn't fired its close cb yet. + await Promise.resolve(); + expect(resolved).toBe(false); + + closeCb?.(); + await done; + expect(resolved).toBe(true); + }); + + test("returns immediately when server never started", async () => { + const plugin = new ServerPlugin({ + context: createContextWithPlugins({}), + } as any); - exitSpy.mockRestore(); - vi.useRealTimers(); + // No `this.server` assigned. + await expect(plugin.gracefulClose()).resolves.toBeUndefined(); }); }); }); diff --git a/packages/appkit/src/tasks/defaults.ts b/packages/appkit/src/tasks/defaults.ts new file mode 100644 index 00000000..7323ea38 --- /dev/null +++ b/packages/appkit/src/tasks/defaults.ts @@ -0,0 +1,36 @@ +import type { TaskflowConfig as VendorTaskConfig } from "../../vendor/taskflow/taskflow.js"; + +export type TaskConfig = VendorTaskConfig; + +/** AppKit defaults; paths live under `.appkit/tasks/`. */ +export const taskDefaults: TaskConfig = { + engine: { + walPath: ".appkit/tasks/wal", + recoveryIntervalMs: 5000, + staleThresholdMs: 30000, + enableTestMode: false, + }, + executor: { + heartbeatIntervalMs: 5000, + }, + storage: { + backend: "sqlite", + databasePath: ".appkit/tasks/tasks.db", + }, +}; + +/** Merges AppKit defaults with user config; explicit user fields win. */ +export function mergeTaskDefaults(user: TaskConfig | undefined): TaskConfig { + if (!user) return taskDefaults; + const merged: TaskConfig = { + ...taskDefaults, + ...user, + engine: { ...taskDefaults.engine, ...user.engine }, + executor: { ...taskDefaults.executor, ...user.executor }, + storage: user.storage ?? taskDefaults.storage, + }; + if (user.wal) merged.wal = { ...user.wal }; + if (user.admission) merged.admission = { ...user.admission }; + if (user.stream) merged.stream = { ...user.stream }; + return merged; +} diff --git a/packages/appkit/src/tasks/index.ts b/packages/appkit/src/tasks/index.ts new file mode 100644 index 00000000..3cc8a422 --- /dev/null +++ b/packages/appkit/src/tasks/index.ts @@ -0,0 +1,2 @@ +export type { TaskConfig } from "./defaults"; +export { TaskManager } from "./manager"; diff --git a/packages/appkit/src/tasks/manager.ts b/packages/appkit/src/tasks/manager.ts new file mode 100644 index 00000000..2b5a470c --- /dev/null +++ b/packages/appkit/src/tasks/manager.ts @@ -0,0 +1,317 @@ +/** + * TaskManager — durable execution core service. + * + * Wraps the vendored TaskFlow Node.js bindings (Rust + napi). Booted by + * `createApp` and exposed to plugins as `this.task`. Default storage is + * SQLite at `.appkit/tasks/tasks.db`; opt out with + * `createApp({ task: false })`. + */ + +import type { + ResumeOptions, + StopOptions, + StreamEvent, + SubmitOptions, + Task, + Engine as TaskflowEngine, + TaskHandle, +} from "../../vendor/taskflow/taskflow.js"; +import { createLogger } from "../logging/logger"; +import { + type Counter, + SpanStatusCode, + TelemetryManager, + type TelemetryProvider, +} from "../telemetry"; +import { mergeTaskDefaults, type TaskConfig } from "./defaults"; +import { loadVendorModule } from "./vendor-loader"; + +const logger = createLogger("tasks"); + +interface TaskCounters { + starts: Counter; + reconnects: Counter; + resumes: Counter; + stops: Counter; + subscriptions: Counter; +} + +/** + * Single instance per AppKit app, booted by `createApp` and exposed to + * plugins as `this.task`. + */ +export class TaskManager { + private static _instance: TaskManager | null = null; + + /** + * Raw vendor engine. Escape hatch for advanced callers (e.g. registering + * a task definition). Pass-through methods on this manager add tracing + * and metrics; direct engine calls skip both. + */ + readonly engine: TaskflowEngine; + + private readonly telemetry: TelemetryProvider; + private readonly metrics: TaskCounters; + private hasShutdown = false; + + private constructor(engine: TaskflowEngine) { + this.engine = engine; + this.telemetry = TelemetryManager.getProvider("tasks"); + const meter = this.telemetry.getMeter(); + this.metrics = { + starts: meter.createCounter("tasks.starts", { + description: "Tasks submitted via TaskManager.start", + unit: "1", + }), + reconnects: meter.createCounter("tasks.reconnects", { + description: "Reconnect lookups (labelled by found|notfound)", + unit: "1", + }), + resumes: meter.createCounter("tasks.resumes", { + description: "Resume attempts (labelled by found|notfound)", + unit: "1", + }), + stops: meter.createCounter("tasks.stops", { + description: "Stop calls (always counted; idempotent)", + unit: "1", + }), + subscriptions: meter.createCounter("tasks.subscriptions", { + description: "Stream subscriptions opened via TaskManager.subscribe", + unit: "1", + }), + }; + } + + /** + * Bootstraps the service. Pass `false` to opt out (returns `null`). + * Idempotent: subsequent calls return the existing instance. + */ + static async initialize( + config: TaskConfig | false | undefined, + ): Promise { + if (config === false) { + logger.info("Tasks disabled via createApp({ task: false })."); + TaskManager._instance = null; + return null; + } + if (TaskManager._instance) return TaskManager._instance; + + const merged = mergeTaskDefaults(config); + logger.debug("Initializing task engine", { config: merged }); + warnOnEphemeralStorage(merged); + const vendor = await loadVendorModule(); + const engine = await vendor.Engine.create(merged); + const service = new TaskManager(engine); + TaskManager._instance = service; + return service; + } + + /** Bootstraps the service. Returns `null` when opted out. */ + static async boot( + config: TaskConfig | false | undefined, + ): Promise<{ instance: TaskManager; stop(): Promise } | null> { + const service = await TaskManager.initialize(config); + if (!service) return null; + return { instance: service, stop: () => service.shutdown() }; + } + + /** Returns the live instance or `null` when the app opted out. */ + static getInstanceSync(): TaskManager | null { + return TaskManager._instance; + } + + /** + * Spawns a new task attempt. Returns a handle even when a task with the + * same idempotency key already exists — dedup is resolved by the engine + * based on `executeMode`. + */ + async start( + name: string, + input: unknown, + options: SubmitOptions = {}, + ): Promise { + this.assertAlive(); + return this.telemetry.startActiveSpan( + "tasks.start", + { attributes: { "task.name": name } }, + async (span) => { + try { + const handle = await this.engine.submit(name, input, options); + span.setAttribute("task.id", handle.taskId); + span.setStatus({ code: SpanStatusCode.OK }); + this.metrics.starts.add(1, { "task.name": name }); + return handle; + } catch (err) { + span.setStatus({ code: SpanStatusCode.ERROR }); + throw err; + } + }, + ); + } + + /** + * Returns the current task record, or `null` if not found / unauthorized. + * + * **Auth contract:** the engine does NOT authenticate. The `userId` + * passed here is matched against the submit-time owner only to gate + * existence — the embedder MUST verify the caller's identity at the + * route layer (e.g. from `x-forwarded-user`) before forwarding it. + */ + async reconnect( + idempotencyKey: string, + userId?: string, + ): Promise { + this.assertAlive(); + return this.telemetry.startActiveSpan( + "tasks.reconnect", + { attributes: { "task.idempotencyKey": idempotencyKey } }, + async (span) => { + const task = await this.engine.reconnect(idempotencyKey, userId); + span.setAttribute("task.found", task !== null); + span.setStatus({ code: SpanStatusCode.OK }); + this.metrics.reconnects.add(1, { + result: task ? "found" : "notfound", + }); + return task; + }, + ); + } + + /** + * Revives a suspended task — after a deliberate `stop()` or, for an OBO + * task, after a crash (auto-recovery is disabled in that case). + * + * **Auth contract:** see {@link reconnect}. The engine never reveals + * existence to a caller whose `userId` mismatches the owner; verify + * the value at the route layer before forwarding. + */ + async resume( + idempotencyKey: string, + options: ResumeOptions = {}, + ): Promise { + this.assertAlive(); + return this.telemetry.startActiveSpan( + "tasks.resume", + { attributes: { "task.idempotencyKey": idempotencyKey } }, + async (span) => { + const task = await this.engine.resume(idempotencyKey, options); + span.setAttribute("task.found", task !== null); + span.setStatus({ code: SpanStatusCode.OK }); + this.metrics.resumes.add(1, { result: task ? "found" : "notfound" }); + return task; + }, + ); + } + + /** + * Cooperative stop. Emits a `suspended` event. Idempotent. + * + * **Auth contract:** see {@link reconnect}. A mismatched `userId` + * surfaces as `TaskNotFound`; verify the value at the route layer + * before forwarding. + */ + async stop( + idempotencyKey: string, + options: StopOptions = {}, + ): Promise { + this.assertAlive(); + return this.telemetry.startActiveSpan( + "tasks.stop", + { attributes: { "task.idempotencyKey": idempotencyKey } }, + async (span) => { + try { + const handle = await this.engine.stop(idempotencyKey, options); + span.setStatus({ code: SpanStatusCode.OK }); + this.metrics.stops.add(1); + return handle; + } catch (err) { + span.setStatus({ code: SpanStatusCode.ERROR }); + throw err; + } + }, + ); + } + + /** Drains in-flight tasks and shuts the engine down. Idempotent. */ + async shutdown(): Promise { + if (this.hasShutdown) return; + this.hasShutdown = true; + logger.info("Shutting down task engine"); + await this.engine.shutdown(); + if (TaskManager._instance === this) { + TaskManager._instance = null; + } + } + + /** + * Async iterable of `StreamEvent`s ordered by sequence number. Pass + * `lastSeq` to resume from a known position (SSE reconnection). + */ + subscribe( + idempotencyKey: string, + lastSeq?: number, + ): AsyncIterableIterator { + this.assertAlive(); + this.metrics.subscriptions.add(1); + return this.engine.subscribe(idempotencyKey, lastSeq); + } + + /** + * Test-only: aborts the executor mid-run without writing a terminal + * event so reconnect/recovery exercises the crash path. Throws unless + * `engine.enableTestMode: true` was set at boot. + * + * @internal + */ + simulateCrash(idempotencyKey: string): void { + this.assertAlive(); + this.engine.simulateCrash(idempotencyKey); + } + + /** + * Test-only singleton reset. Hard-fails in production. Shuts down the + * previous engine before zeroing the pointer so workers and storage + * handles don't leak across tests. + * + * @internal + */ + static async _resetForTests(): Promise { + if (process.env.NODE_ENV === "production") { + throw new Error( + "TaskManager._resetForTests() is test-only and refuses to run when NODE_ENV=production.", + ); + } + const prev = TaskManager._instance; + TaskManager._instance = null; + if (prev) await prev.shutdown().catch(() => {}); + } + + private assertAlive(): void { + if (this.hasShutdown) { + throw new Error("TaskManager has been shut down."); + } + } +} + +/** + * Warns when SQLite is paired with a Databricks Apps environment — the + * per-pod filesystem cannot survive rolling restarts, so durability + * silently degrades. We can't refuse to boot since single-process dev + * looks identical at the config level. + */ +function warnOnEphemeralStorage(config: TaskConfig): void { + const isDatabricksApps = + !!process.env.DATABRICKS_APP_NAME || + !!process.env.DATABRICKS_APP_ID || + !!process.env.DATABRICKS_APP_URL; + if (!isDatabricksApps) return; + const backend = config.storage?.backend ?? "sqlite"; + if (backend !== "sqlite") return; + logger.warn( + "Tasks configured with the SQLite backend but the runtime appears " + + "to be Databricks Apps (multi-pod, no shared volume). Tasks will " + + "not survive rolling restarts. For production, switch the backend " + + "to `lakebase` (Postgres) via `task: { storage: { backend: " + + "'lakebase', connectionString: … } }`.", + ); +} diff --git a/packages/appkit/src/tasks/tests/manager.test.ts b/packages/appkit/src/tasks/tests/manager.test.ts new file mode 100644 index 00000000..bc216261 --- /dev/null +++ b/packages/appkit/src/tasks/tests/manager.test.ts @@ -0,0 +1,227 @@ +import { afterEach, beforeEach, describe, expect, test, vi } from "vitest"; + +const { engineSubmit, engineShutdown, engineCreate } = vi.hoisted(() => { + const engineSubmit = vi.fn(async () => ({ + taskId: "t-1", + idempotencyKey: "ik-1", + })); + const engineShutdown = vi.fn(async () => {}); + const engineCreate = vi.fn(async (_config: unknown) => ({ + submit: engineSubmit, + reconnect: vi.fn(), + resume: vi.fn(), + stop: vi.fn(), + cancelTask: vi.fn(), + subscribe: vi.fn(), + shutdown: engineShutdown, + simulateCrash: vi.fn(), + registerTask: vi.fn(), + })); + return { engineSubmit, engineShutdown, engineCreate }; +}); + +vi.mock("../vendor-loader", () => ({ + loadVendorModule: vi.fn(async () => ({ + Engine: { create: engineCreate }, + })), +})); + +const { taskLogger } = vi.hoisted(() => ({ + taskLogger: { + info: vi.fn(), + warn: vi.fn(), + debug: vi.fn(), + error: vi.fn(), + event: vi.fn(), + }, +})); + +vi.mock("../../logging/logger", () => ({ + createLogger: () => taskLogger, +})); + +vi.mock("../../telemetry", () => ({ + SpanStatusCode: { OK: 1, ERROR: 2 }, + TelemetryManager: { + getProvider: vi.fn().mockReturnValue({ + getMeter: vi.fn().mockReturnValue({ + createCounter: vi.fn().mockReturnValue({ add: vi.fn() }), + }), + startActiveSpan: vi.fn(async (_name, _opts, fn) => { + const span = { + setAttribute: vi.fn(), + setStatus: vi.fn(), + recordException: vi.fn(), + end: vi.fn(), + }; + return fn(span); + }), + }), + }, +})); + +import { mergeTaskDefaults, taskDefaults } from "../defaults"; +import { TaskManager } from "../manager"; + +describe("TaskManager", () => { + beforeEach(() => { + engineSubmit.mockClear(); + engineShutdown.mockClear(); + engineCreate.mockClear(); + }); + + afterEach(async () => { + await TaskManager._resetForTests(); + }); + + describe("opt-out", () => { + test("returns null when config is false", async () => { + const result = await TaskManager.initialize(false); + expect(result).toBeNull(); + expect(engineCreate).not.toHaveBeenCalled(); + expect(TaskManager.getInstanceSync()).toBeNull(); + }); + + test("boot() returns null when config is false", async () => { + const result = await TaskManager.boot(false); + expect(result).toBeNull(); + }); + }); + + describe("initialize", () => { + test("is idempotent — second call returns the same instance", async () => { + const a = await TaskManager.initialize(undefined); + const b = await TaskManager.initialize(undefined); + expect(a).toBe(b); + expect(engineCreate).toHaveBeenCalledTimes(1); + }); + + test("merges user config over defaults", async () => { + await TaskManager.initialize({ + storage: { backend: "lakebase", connectionString: "postgres://x" }, + }); + expect(engineCreate).toHaveBeenCalledWith( + expect.objectContaining({ + engine: expect.objectContaining({ recoveryIntervalMs: 5000 }), + storage: { backend: "lakebase", connectionString: "postgres://x" }, + }), + ); + }); + }); + + describe("getInstanceSync", () => { + test("returns null before initialization", () => { + expect(TaskManager.getInstanceSync()).toBeNull(); + }); + + test("returns the instance after initialize", async () => { + const instance = await TaskManager.initialize(undefined); + expect(TaskManager.getInstanceSync()).toBe(instance); + }); + }); + + describe("shutdown", () => { + test("is idempotent — second call is a no-op", async () => { + const instance = (await TaskManager.initialize(undefined)) as TaskManager; + await instance.shutdown(); + await instance.shutdown(); + expect(engineShutdown).toHaveBeenCalledTimes(1); + }); + + test("rejects further calls on a shut-down instance", async () => { + const instance = (await TaskManager.initialize(undefined)) as TaskManager; + await instance.shutdown(); + await expect(instance.start("foo", {})).rejects.toThrow( + /has been shut down/, + ); + }); + }); + + describe("_resetForTests", () => { + test("shuts down the previous engine before zeroing the pointer", async () => { + await TaskManager.initialize(undefined); + await TaskManager._resetForTests(); + expect(engineShutdown).toHaveBeenCalledTimes(1); + expect(TaskManager.getInstanceSync()).toBeNull(); + }); + + test("refuses to run when NODE_ENV=production", async () => { + await TaskManager.initialize(undefined); + const orig = process.env.NODE_ENV; + process.env.NODE_ENV = "production"; + try { + await expect(TaskManager._resetForTests()).rejects.toThrow(/test-only/); + } finally { + process.env.NODE_ENV = orig; + } + }); + }); + + describe("warnOnEphemeralStorage", () => { + test("warns when SQLite + Databricks Apps env is detected", async () => { + taskLogger.warn.mockClear(); + const orig = process.env.DATABRICKS_APP_NAME; + process.env.DATABRICKS_APP_NAME = "myapp"; + try { + await TaskManager.initialize(undefined); + expect(taskLogger.warn).toHaveBeenCalledWith( + expect.stringContaining("Tasks configured with the SQLite backend"), + ); + } finally { + if (orig === undefined) delete process.env.DATABRICKS_APP_NAME; + else process.env.DATABRICKS_APP_NAME = orig; + } + }); + + test("does not warn on non-sqlite backend", async () => { + taskLogger.warn.mockClear(); + const orig = process.env.DATABRICKS_APP_NAME; + process.env.DATABRICKS_APP_NAME = "myapp"; + try { + await TaskManager.initialize({ + storage: { backend: "lakebase", connectionString: "postgres://x" }, + }); + expect(taskLogger.warn).not.toHaveBeenCalled(); + } finally { + if (orig === undefined) delete process.env.DATABRICKS_APP_NAME; + else process.env.DATABRICKS_APP_NAME = orig; + } + }); + }); +}); + +describe("mergeTaskDefaults", () => { + test("returns defaults when user config is undefined", () => { + expect(mergeTaskDefaults(undefined)).toBe(taskDefaults); + }); + + test("replaces storage wholesale (discriminated union)", () => { + const merged = mergeTaskDefaults({ + storage: { backend: "lakebase", connectionString: "postgres://x" }, + }); + expect(merged.storage).toEqual({ + backend: "lakebase", + connectionString: "postgres://x", + }); + // The sqlite-specific `databasePath` from defaults must NOT leak in. + expect( + (merged.storage as Record).databasePath, + ).toBeUndefined(); + }); + + test("shallow-merges engine and executor fields", () => { + const merged = mergeTaskDefaults({ + engine: { recoveryIntervalMs: 9999 }, + }); + expect(merged.engine?.recoveryIntervalMs).toBe(9999); + // Default field preserved. + expect(merged.engine?.staleThresholdMs).toBe(30000); + }); + + test("only emits wal/admission/stream when present in user config", () => { + const merged = mergeTaskDefaults({}); + expect(merged.wal).toBeUndefined(); + expect(merged.admission).toBeUndefined(); + expect(merged.stream).toBeUndefined(); + }); +}); diff --git a/packages/appkit/src/tasks/vendor-loader.ts b/packages/appkit/src/tasks/vendor-loader.ts new file mode 100644 index 00000000..6bfe5b23 --- /dev/null +++ b/packages/appkit/src/tasks/vendor-loader.ts @@ -0,0 +1,139 @@ +import { createHash } from "node:crypto"; +import { readFile, stat } from "node:fs/promises"; +import { dirname, join } from "node:path"; +import { fileURLToPath } from "node:url"; +import { InitializationError } from "../errors"; + +/** Type-only import keeps `import "@databricks/appkit"` from touching the native binary. */ +type VendorModule = typeof import("../../vendor/taskflow/taskflow.js"); + +let cachedVendor: VendorModule | null = null; + +interface VendorManifest { + name?: string; + version?: string; + description?: string; + platforms?: Record; + loader?: { file: string; sha256: string }; + types?: { file: string; sha256: string }; +} + +/** + * Lazy-loads the vendored binary so opting out via `createApp({ task: false })` + * does not require a prebuilt binary for the current platform. + * + * Integrity is verified by default in `NODE_ENV=production` and skipped + * elsewhere. Override with `APPKIT_VERIFY_TASKFLOW_VENDOR=1` / `=0`. + * + * @internal + */ +export async function loadVendorModule(): Promise { + if (cachedVendor) return cachedVendor; + try { + if (shouldVerifyVendor()) await verifyVendorIntegrity(); + cachedVendor = (await import( + "../../vendor/taskflow/taskflow.js" + )) as VendorModule; + return cachedVendor; + } catch (err) { + if (err instanceof InitializationError) throw err; + const message = (err as { message?: string } | undefined)?.message ?? err; + throw new InitializationError( + `TaskFlow native binary unavailable for ${process.platform}-${process.arch}: ${message}`, + { cause: err instanceof Error ? err : undefined }, + ); + } +} + +function shouldVerifyVendor(): boolean { + const override = process.env.APPKIT_VERIFY_TASKFLOW_VENDOR; + if (override === "1") return true; + if (override === "0") return false; + return process.env.NODE_ENV === "production"; +} + +/** + * Verifies the platform `.node` and JS loader against `VENDOR.json`. + * Throws `InitializationError` on mismatch so a tampered binary never + * reaches the runtime. + */ +async function verifyVendorIntegrity(): Promise { + const vendorDir = await resolveVendorDir(); + const manifest = await loadManifest(vendorDir); + const platformKey = `${process.platform}-${process.arch}`; + const platform = manifest.platforms?.[platformKey]; + + if (!platform && !manifest.loader) { + throwIntegrityError( + `VENDOR.json has no entries for ${platformKey} and no loader manifest.`, + ); + } + + if (platform) { + await verifyFile( + join(vendorDir, platform.file), + platform.sha256, + `binary (${platformKey})`, + ); + } + if (manifest.loader) { + await verifyFile( + join(vendorDir, manifest.loader.file), + manifest.loader.sha256, + "loader", + ); + } +} + +/** + * Locates the vendor directory at runtime. The folder sits at a + * different relative depth in source vs. published builds, so we + * probe the known candidates and return the first that exists. + */ +async function resolveVendorDir(): Promise { + const here = dirname(fileURLToPath(import.meta.url)); + const candidates = [ + join(here, "..", "..", "vendor", "taskflow"), + join(here, "..", "appkit", "vendor", "taskflow"), + ]; + for (const candidate of candidates) { + if (await exists(join(candidate, "VENDOR.json"))) return candidate; + } + throwIntegrityError( + `VENDOR.json not found under any of: ${candidates.join(", ")}`, + ); +} + +async function exists(path: string): Promise { + try { + await stat(path); + return true; + } catch { + return false; + } +} + +async function loadManifest(vendorDir: string): Promise { + const raw = await readFile(join(vendorDir, "VENDOR.json"), "utf8"); + return JSON.parse(raw) as VendorManifest; +} + +async function verifyFile( + path: string, + expected: string, + label: string, +): Promise { + const buf = await readFile(path); + const actual = createHash("sha256").update(buf).digest("hex"); + if (actual !== expected) { + throwIntegrityError( + `${label} sha256 mismatch.\n expected: ${expected}\n actual: ${actual}`, + ); + } +} + +function throwIntegrityError(reason: string): never { + throw new InitializationError( + `TaskFlow vendor integrity check failed: ${reason}`, + ); +} diff --git a/packages/appkit/tsdown.config.ts b/packages/appkit/tsdown.config.ts index d61e8c53..e969ab33 100644 --- a/packages/appkit/tsdown.config.ts +++ b/packages/appkit/tsdown.config.ts @@ -33,6 +33,11 @@ export default defineConfig([ to: "dist/plugins/server/remote-tunnel", flatten: true, }, + { + from: "vendor/taskflow/*", + to: "dist/appkit/vendor/taskflow", + flatten: true, + }, ], }, ]); diff --git a/packages/appkit/vendor/taskflow/VENDOR.json b/packages/appkit/vendor/taskflow/VENDOR.json new file mode 100644 index 00000000..50e79c3a --- /dev/null +++ b/packages/appkit/vendor/taskflow/VENDOR.json @@ -0,0 +1,24 @@ +{ + "$schema": "./VENDOR.schema.json", + "name": "@databricks/taskflow", + "description": "Vendored TaskFlow Node.js bindings (Rust + napi). Source of truth: github.com/databricks/taskflow.", + "version": "0.1.0-internal", + "platforms": { + "darwin-arm64": { + "file": "taskflow-darwin-arm64.node", + "sha256": "2a8354e51d3598ce7608d5614866bb4784a9caa768ccb57d0d888529521bcd20" + }, + "linux-x64": { + "file": "taskflow-linux-x64.node", + "sha256": "12044089b8fe117268104c2e0ea5a7d1b4bf1f058f1e5ed6d7ebe51db2629f50" + } + }, + "loader": { + "file": "taskflow.js", + "sha256": "46aabcaef2c5524509577658f0a0a4f33a332fb5c51a8935520f84570c4ccc2e" + }, + "types": { + "file": "taskflow.d.ts", + "sha256": "4446c8e0112f3961562073a971fd6b21f0b44ce9a736e08f7151f69c67dbfe5d" + } +} diff --git a/packages/appkit/vendor/taskflow/taskflow-darwin-arm64.node b/packages/appkit/vendor/taskflow/taskflow-darwin-arm64.node new file mode 100755 index 00000000..5b6fbc77 Binary files /dev/null and b/packages/appkit/vendor/taskflow/taskflow-darwin-arm64.node differ diff --git a/packages/appkit/vendor/taskflow/taskflow-linux-x64.node b/packages/appkit/vendor/taskflow/taskflow-linux-x64.node new file mode 100755 index 00000000..7d5c2a3d Binary files /dev/null and b/packages/appkit/vendor/taskflow/taskflow-linux-x64.node differ diff --git a/packages/appkit/vendor/taskflow/taskflow.d.ts b/packages/appkit/vendor/taskflow/taskflow.d.ts new file mode 100644 index 00000000..dc0b86fc --- /dev/null +++ b/packages/appkit/vendor/taskflow/taskflow.d.ts @@ -0,0 +1,279 @@ +export interface TaskHandle { + taskId: string; + idempotencyKey: string; +} + +export interface Task { + id: string; + name: string; + idempotencyKey: string; + userId: string | null; + status: string; + input: any; + result: any | null; + error: string | null; + createdAtMs: number; + startedAtMs: number | null; + completedAtMs: number | null; + attempt: number; + timeoutMs: number | null; + maxAttempts: number | null; +} + +export interface TaskEvent { + id: string; + taskId: string; + idempotencyKey: string; + seq: number; + eventType: string; + timestampMs: number; + payload: any; +} + +export interface StreamEvent { + streamSeq: number; + event: TaskEvent; +} + +export interface SubmitOptions { + /** + * Caller identity for ownership checks on later `reconnect` / `resume` / + * `stop`. The engine does not authenticate; the embedder must verify this + * value (e.g., from a session token) before passing it in. + */ + userId?: string; + /** + * Dedup strictness. Defaults to `at_least_once` (fast path; cache-backed). + * Use `at_most_once` for non-idempotent transactions; the engine then + * always queries storage before creating the task, sacrificing latency for + * stronger cross-pod uniqueness. + */ + executeMode?: 'at_least_once' | 'at_most_once'; + /** Per-attempt handler timeout. Falls back to `executor.defaultTimeoutMs`. */ + timeoutMs?: number; + /** + * Per-task retry budget (overrides `executor.retry.maxAttempts`). The + * count includes the first try, so `maxAttempts: 3` means up to 2 retries. + */ + maxAttempts?: number; + /** + * Live JS object passed through to the handler as `ctx.context`. Stored + * in the FFI sidecar (see `stream.contextSidecarCapacity`); released + * automatically when the executor exits. Never serialised: not durable + * across crashes; not visible to handlers re-spawned on a different pod + * by the recovery worker. + */ + context?: any; +} + +export interface RegisterTaskOptions { + /** + * Whether the recovery worker should re-spawn this task automatically + * after a stale heartbeat. Defaults to `true`. Set to `false` when the + * task should only be revived by an explicit external trigger + * (`engine.resume(...)`). + */ + autoRecover?: boolean; +} + +export interface ResumeOptions { + /** + * Ownership check: must equal the `userId` passed at submit time. + * Mismatch returns `null` (the engine never reveals existence to an + * unauthorised caller). The engine does not authenticate; the embedder + * must verify this value before passing it in. + */ + userId?: string; + /** + * Live JS object for the new attempt's `ctx.context`. Replaces the + * spawn-time context, which has already been released. See + * `SubmitOptions.context` for lifetime caveats. + */ + context?: any; +} + +export interface StopOptions { + /** + * Ownership check: must equal the `userId` passed at submit time. + * Mismatch returns `TaskNotFound` (the engine never reveals existence to + * an unauthorised caller). The engine does not authenticate; the + * embedder must verify this value before passing it in. + */ + userId?: string; + /** + * Human-readable reason persisted with the suspension event. Surfaced to + * every `subscribe` consumer and bounded by + * `MAX_SUSPENDED_REASON_LEN` (512 chars). Defaults to `"stopped via API"`. + */ + reason?: string; +} + +export interface TaskContext { + readonly taskId: string; + readonly idempotencyKey: string; + readonly userId: string | null; + readonly attempt: number; + readonly previousEvents: TaskEvent[]; + readonly isRecovery: boolean; + readonly context: any | null; + + emit(eventType: string, payload?: any): Promise; + heartbeat(): Promise; +} + +export interface TaskDefinition { + name: string; + execute(input: TInput, ctx: TaskContext): Promise; + recover?(input: TInput, ctx: TaskContext): Promise; + autoRecover?: boolean; +} + +export interface TaskflowConfig { + engine?: { + walPath?: string; + flushIntervalMs?: number; + recoveryIntervalMs?: number; + staleThresholdMs?: number; + flushMaxBatchSize?: number; + recoveryMaxPerCycle?: number; + shutdownGracePeriodMs?: number; + shutdownDeadlineMs?: number; + recoveryTaskTimeoutMs?: number; + enableTestMode?: boolean; + }; + wal?: { + maxSegmentBytes?: number; + minSegmentsRetained?: number; + maxPendingWrites?: number; + maxBatchSize?: number; + }; + admission?: { + guard?: { globalRateLimit?: number; perUserRateLimit?: number }; + slots?: { globalLimit?: number; perUserLimit?: number }; + }; + executor?: { + retry?: { + maxAttempts?: number; + initialDelayMs?: number; + maxDelayMs?: number; + backoffMultiplier?: number; + }; + heartbeatIntervalMs?: number; + defaultTimeoutMs?: number; + }; + storage?: + | { + backend: 'sqlite'; + databasePath?: string; + maxConnections?: number; + connectionTimeoutMs?: number; + maxEventsPerTask?: number; + retry?: StorageRetryConfig; + } + | { + backend: 'lakebase'; + connectionString?: string; + maxConnections?: number; + connectionTimeoutMs?: number; + maxEventsPerTask?: number; + retry?: StorageRetryConfig; + }; + stream?: { + bufferCapacity?: number; + retentionMs?: number; + channelCapacity?: number; + reapIntervalMs?: number; + contextSidecarCapacity?: number; + }; +} + +export interface StorageRetryConfig { + maxRetries?: number; + initialDelayMs?: number; + maxDelayMs?: number; + backoffMultiplier?: number; +} + +// Low-level API: native `Engine` bindings. + +export declare class Engine { + static create(config?: TaskflowConfig): Promise; + registerTask(definition: TaskDefinition): void; + submit(name: string, input: any, options?: SubmitOptions): Promise; + reconnect(idempotencyKey: string, userId?: string): Promise; + /** + * Revive a `Suspended` task and start a fresh attempt. + * + * Returns `null` if the task does not exist, has been moved to a terminal + * state, is no longer `Suspended`, OR if the caller's `userId` does not + * match the submit-time owner — these cases are intentionally indistinguishable + * to prevent existence probing by unauthorised callers. Throws on + * underlying engine errors (storage, slot exhaustion). + */ + resume(idempotencyKey: string, options?: ResumeOptions): Promise; + /** + * Move a task into `Suspended` and raise the stop intent. + * + * For `Created` tasks this is a synchronous durable transition; for + * `Running` tasks the stop intent is honoured by the next heartbeat. + * **Naming note:** despite the verb, this emits a `suspended` event, not + * a `stopped` event — the JS-side method is named `stop` because that is + * the user-facing verb, but the underlying semantics are identical to + * `engine.suspend(...)`. Use `resume` to revive the task; use + * `cancelTask` for a true terminal cancellation. Idempotent. + * + * Throws `TaskNotFound` if the task does not exist or the caller's + * `userId` does not match the owner. + */ + stop(idempotencyKey: string, options?: StopOptions): Promise; + cancelTask(idempotencyKey: string): void; + subscribe(idempotencyKey: string, lastSeq?: number): AsyncIterableIterator; + shutdown(): Promise; + /** + * Test-only. Aborts the executor mid-run without writing a terminal + * event so a subsequent reconnect/recovery exercises the crash path. + * Throws `TestModeDisabled` unless `engine.enableTestMode = true` in the + * config; production deployments must leave this disabled. + */ + simulateCrash(idempotencyKey: string): void; +} + +// High-level SDK: same shape as the Python `Taskflow` helpers. + +export declare class Taskflow { + constructor(config?: TaskflowConfig); + + static task(name: string, fn: (input: any, ctx: TaskContext) => Promise): typeof fn; + static task(name: string, options?: { + recover?: (input: any, ctx: TaskContext) => Promise; + autoRecover?: boolean; + }): (fn: (input: any, ctx: TaskContext) => Promise) => typeof fn; + + static start(name: string, input: any, userId?: string): Promise; + static start(name: string, input: any, options?: SubmitOptions): Promise; + /** See `Engine.resume`. */ + static resume(idempotencyKey: string, options?: ResumeOptions): Promise; + /** See `Engine.stop`. */ + static stop(idempotencyKey: string, options?: StopOptions): Promise; + static subscribe( + idempotencyKey: string, + lastSeq?: number, + userId?: string, + ): Promise>; + /** Test-only. See `Engine.simulateCrash`. */ + static simulateCrash(idempotencyKey: string): Promise; + static shutdown(): Promise; +} + +// Workflow primitives: opinionated helpers; callers may extend them. + +type StepFn = + (ctx: TaskContext, ...args: TArgs) => Promise; + +export declare namespace workflow { + function step( + fn: StepFn, + ): StepFn; + + function findEvent(ctx: TaskContext, eventType: string): TaskEvent | null; +} diff --git a/packages/appkit/vendor/taskflow/taskflow.js b/packages/appkit/vendor/taskflow/taskflow.js new file mode 100644 index 00000000..72cf11ab --- /dev/null +++ b/packages/appkit/vendor/taskflow/taskflow.js @@ -0,0 +1,36 @@ +import { createRequire } from "node:module"; + +const require = createRequire(import.meta.url); + +// Some consumers ship a single binary (`taskflow.node`) per app — they +// install platform-specific tarballs. Others (e.g. AppKit) vendor multiple +// binaries side by side and resolve at runtime via `${platform}-${arch}`. +// This loader supports both: it prefers the platform-specific name and +// falls back to the generic one. +const platform = `${process.platform}-${process.arch}`; +const candidates = [`./taskflow-${platform}.node`, "./taskflow.node"]; + +let native; +const errors = []; +for (const candidate of candidates) { + try { + native = require(candidate); + break; + } catch (err) { + errors.push(`${candidate}: ${err?.message ?? err}`); + } +} + +if (!native) { + const detail = errors.map((e) => ` - ${e}`).join("\n"); + throw new Error( + `[taskflow] No native binary found for ${platform}. Tried:\n${detail}\n` + + `If you build from source, run \`bin/build-nodejs.sh\` for your platform; ` + + `if you installed a published package, this platform is not in the prebuild matrix.`, + ); +} + +export const Engine = native.Engine; +export const Taskflow = native.Taskflow; +export const workflow = native.workflow; +export default native; diff --git a/template/_gitignore b/template/_gitignore index 23adbc24..39877c6a 100644 --- a/template/_gitignore +++ b/template/_gitignore @@ -6,6 +6,9 @@ build/ .env .databricks/ .smoke-test/ + +.appkit/ + test-results/ playwright-report/