Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,5 @@ coverage
.turbo

.databricks

.appkit/
4 changes: 3 additions & 1 deletion apps/dev-playground/.gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,6 @@ test-results/
playwright-report/

# Auto-generated types (endpoint-specific, varies per developer)
shared/appkit-types/serving.d.ts
shared/appkit-types/serving.d.ts

.appkit/
1 change: 1 addition & 0 deletions knip.json
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
"**/*.generated.ts",
"**/*.example.tsx",
"**/*.css",
"packages/appkit/vendor/**",
"packages/appkit/src/plugins/vector-search/**",
"packages/appkit/src/plugin/index.ts",
"packages/appkit/src/plugin/to-plugin.ts",
Expand Down
11 changes: 8 additions & 3 deletions packages/appkit/src/core/appkit.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@ import {
TelemetryReporter,
} from "../internal-telemetry";
import { ResourceRegistry, ResourceType } from "../registry";
import type { TaskConfig } from "../tasks";
import type { TelemetryConfig } from "../telemetry";
import { registerGracefulShutdownHandlers } from "./graceful-shutdown";
import { isToolProvider, PluginContext } from "./plugin-context";
import { type ServiceManager, startCoreServices } from "./service-manager";

Expand Down Expand Up @@ -193,6 +195,7 @@ export class AppKit<TPlugins extends InputPluginMap> {
plugins?: T;
telemetry?: TelemetryConfig;
cache?: CacheConfig;
task?: TaskConfig | false;
client?: WorkspaceClient;
onPluginsReady?: (appkit: PluginMap<T>) => void | Promise<void>;
disableInternalTelemetry?: boolean;
Expand All @@ -201,6 +204,7 @@ export class AppKit<TPlugins extends InputPluginMap> {
const services = await startCoreServices({
telemetry: config?.telemetry,
cache: config?.cache,
task: config?.task,
});

try {
Expand Down Expand Up @@ -241,11 +245,11 @@ export class AppKit<TPlugins extends InputPluginMap> {

const serverPlugin = instance.#pluginInstances.server;
if (serverPlugin && typeof (serverPlugin as any).start === "function") {
await (serverPlugin as any).start({
shutdownCoreServices: () => services.stop(),
});
await (serverPlugin as any).start();
}

registerGracefulShutdownHandlers(serverPlugin, services);

return handle;
} catch (error) {
await services.stop();
Expand Down Expand Up @@ -322,6 +326,7 @@ export async function createApp<
plugins?: T;
telemetry?: TelemetryConfig;
cache?: CacheConfig;
task?: TaskConfig | false;
client?: WorkspaceClient;
onPluginsReady?: (appkit: PluginMap<T>) => void | Promise<void>;
disableInternalTelemetry?: boolean;
Expand Down
63 changes: 63 additions & 0 deletions packages/appkit/src/core/graceful-shutdown.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
import type { BasePlugin } from "shared";
import { createLogger } from "../logging/logger";
import type { ServiceManager } from "./service-manager";

const log = createLogger("appkit.shutdown");

const FORCE_SHUTDOWN_MS = 15_000;

type GracefulServer = BasePlugin & {
gracefulClose?: () => Promise<void>;
};

/**
* SIGTERM/SIGINT: drain HTTP (if the server plugin exposes it), then stop core
* services. Force-exits after {@link FORCE_SHUTDOWN_MS} if stuck; repeated
* signals are ignored while a run is in flight.
*/
export function registerGracefulShutdownHandlers(
serverPlugin: BasePlugin | undefined,
services: ServiceManager,
): void {
let running = false;

const shutdown = async () => {
if (running) return;
running = true;

const forceExit = setTimeout(() => {
log.error(
"Shutdown timed out after %dms; forcing exit",
FORCE_SHUTDOWN_MS,
);
process.exit(1);
}, FORCE_SHUTDOWN_MS);
forceExit.unref();

try {
await maybeCloseServer(serverPlugin as GracefulServer | undefined);
await services.stop();
clearTimeout(forceExit);
process.exit(0);
} catch (err) {
log.error("Shutdown failed: %O", err);
clearTimeout(forceExit);
process.exit(1);
}
};

const onSignal = () => {
void shutdown();
};

process.on("SIGTERM", onSignal);
process.on("SIGINT", onSignal);
}

async function maybeCloseServer(
serverPlugin: GracefulServer | undefined,
): Promise<void> {
const close = serverPlugin?.gracefulClose;
if (typeof close !== "function") return;
await close.call(serverPlugin);
}
3 changes: 3 additions & 0 deletions packages/appkit/src/core/service-manager.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import type { CacheConfig } from "shared";
import { CacheManager } from "../cache";
import { createLogger } from "../logging";
import { type TaskConfig, TaskManager } from "../tasks";
import { type TelemetryConfig, TelemetryManager } from "../telemetry";

const logger = createLogger("services");
Expand Down Expand Up @@ -55,11 +56,13 @@ export class ServiceManager {
export async function startCoreServices(config: {
telemetry?: TelemetryConfig;
cache?: CacheConfig;
task?: TaskConfig | false;
}): Promise<ServiceManager> {
const services = new ServiceManager();
try {
services.add("telemetry", await TelemetryManager.boot(config.telemetry));
services.add("cache", await CacheManager.boot(config.cache));
services.add("task", await TaskManager.boot(config.task));
return services;
} catch (error) {
await services.stop();
Expand Down
187 changes: 187 additions & 0 deletions packages/appkit/src/core/tests/graceful-shutdown.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,187 @@
import { afterEach, beforeEach, describe, expect, test, vi } from "vitest";

const { mockLogger } = vi.hoisted(() => ({
mockLogger: {
info: vi.fn(),
warn: vi.fn(),
debug: vi.fn(),
error: vi.fn(),
event: vi.fn(),
},
}));

vi.mock("../../logging/logger", () => ({
createLogger: () => mockLogger,
}));

import { registerGracefulShutdownHandlers } from "../graceful-shutdown";

type SignalListener = NodeJS.SignalsListener;

function makeServiceManager() {
return {
add: vi.fn(),
get: vi.fn().mockReturnValue(null),
stop: vi.fn(async () => {}),
} as unknown as import("../service-manager").ServiceManager & {
stop: ReturnType<typeof vi.fn>;
};
}

/**
* Snapshot SIGTERM/SIGINT listeners before each test and surgically remove
* anything that registered during the test. Avoids stepping on vitest's
* own signal handlers.
*/
function captureNewSignalListeners(): () => void {
const before = {
SIGTERM: new Set(process.listeners("SIGTERM")),
SIGINT: new Set(process.listeners("SIGINT")),
};
return () => {
for (const signal of ["SIGTERM", "SIGINT"] as const) {
for (const listener of process.listeners(signal)) {
if (!before[signal].has(listener)) {
process.removeListener(signal, listener as SignalListener);
}
}
}
};
}

function getRegisteredHandler(signal: "SIGTERM" | "SIGINT"): SignalListener {
const listeners = process.listeners(signal);
if (listeners.length === 0) {
throw new Error(`No ${signal} listener registered`);
}
return listeners[listeners.length - 1] as SignalListener;
}

describe("registerGracefulShutdownHandlers", () => {
let exitSpy: ReturnType<typeof vi.fn>;
let cleanupListeners: () => void;

beforeEach(() => {
mockLogger.error.mockClear();
cleanupListeners = captureNewSignalListeners();
exitSpy = vi.fn();
vi.spyOn(process, "exit").mockImplementation(((code?: number) => {
exitSpy(code);
return undefined as never;
}) as never);
});

afterEach(() => {
cleanupListeners();
vi.useRealTimers();
vi.restoreAllMocks();
});

test("registers both SIGTERM and SIGINT handlers", () => {
const services = makeServiceManager();
registerGracefulShutdownHandlers(undefined, services);

expect(() => getRegisteredHandler("SIGTERM")).not.toThrow();
expect(() => getRegisteredHandler("SIGINT")).not.toThrow();
});

test("drains server, stops services in order, exits with 0", async () => {
const gracefulClose = vi.fn(async () => {});
const serverPlugin = { name: "server", gracefulClose } as never;
const services = makeServiceManager();

registerGracefulShutdownHandlers(serverPlugin, services);
getRegisteredHandler("SIGTERM")("SIGTERM");

// Yield twice: once for the async shutdown function to start, once for
// its awaits (close + stop) to resolve through their microtask queues.
await new Promise((r) => setImmediate(r));
await new Promise((r) => setImmediate(r));

expect(gracefulClose).toHaveBeenCalledTimes(1);
expect(services.stop).toHaveBeenCalledTimes(1);
expect(gracefulClose.mock.invocationCallOrder[0]).toBeLessThan(
(services.stop as ReturnType<typeof vi.fn>).mock.invocationCallOrder[0],
);
expect(exitSpy).toHaveBeenCalledWith(0);
});

test("server plugin without gracefulClose is skipped, services still stop", async () => {
const services = makeServiceManager();
const pluginWithoutClose = { name: "noop" } as never;

registerGracefulShutdownHandlers(pluginWithoutClose, services);
getRegisteredHandler("SIGTERM")("SIGTERM");

await new Promise((r) => setImmediate(r));
await new Promise((r) => setImmediate(r));

expect(services.stop).toHaveBeenCalledTimes(1);
expect(exitSpy).toHaveBeenCalledWith(0);
});

test("repeated signals during shutdown are coalesced", async () => {
let resolveClose!: () => void;
const gracefulClose = vi.fn(
() =>
new Promise<void>((r) => {
resolveClose = r;
}),
);
const services = makeServiceManager();
const serverPlugin = { name: "server", gracefulClose } as never;

registerGracefulShutdownHandlers(serverPlugin, services);
const handler = getRegisteredHandler("SIGTERM");

handler("SIGTERM");
handler("SIGTERM");
handler("SIGTERM");
await new Promise((r) => setImmediate(r));

expect(gracefulClose).toHaveBeenCalledTimes(1);

resolveClose();
await new Promise((r) => setImmediate(r));
await new Promise((r) => setImmediate(r));
expect(services.stop).toHaveBeenCalledTimes(1);
});

test("services.stop() failure logs and exits with 1", async () => {
const services = makeServiceManager();
(services.stop as ReturnType<typeof vi.fn>).mockRejectedValueOnce(
new Error("boom"),
);

registerGracefulShutdownHandlers(undefined, services);
getRegisteredHandler("SIGTERM")("SIGTERM");

await new Promise((r) => setImmediate(r));
await new Promise((r) => setImmediate(r));

expect(mockLogger.error).toHaveBeenCalledWith(
"Shutdown failed: %O",
expect.any(Error),
);
expect(exitSpy).toHaveBeenCalledWith(1);
});

test("force-exits when shutdown hangs past FORCE_SHUTDOWN_MS", async () => {
vi.useFakeTimers();
const gracefulClose = vi.fn(() => new Promise<void>(() => {}));
const services = makeServiceManager();
const serverPlugin = { name: "server", gracefulClose } as never;

registerGracefulShutdownHandlers(serverPlugin, services);
getRegisteredHandler("SIGTERM")("SIGTERM");

await Promise.resolve();
vi.advanceTimersByTime(15_000);

expect(exitSpy).toHaveBeenCalledWith(1);
expect(mockLogger.error).toHaveBeenCalledWith(
expect.stringContaining("Shutdown timed out"),
15_000,
);
});
});
11 changes: 11 additions & 0 deletions packages/appkit/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,15 @@ export type {
StreamExecutionSettings,
} from "shared";
export { isSQLTypeMarker, sql } from "shared";
export type {
ResumeOptions,
StopOptions,
StreamEvent,
SubmitOptions,
Task,
TaskEvent,
TaskHandle,
} from "../vendor/taskflow/taskflow.js";
export { CacheManager } from "./cache";
export type { JobsConnectorConfig } from "./connectors/jobs";
export type {
Expand Down Expand Up @@ -96,6 +105,8 @@ export {
ResourceRegistry,
ResourceType,
} from "./registry";
export type { TaskConfig } from "./tasks";
export { TaskManager } from "./tasks";
// Telemetry (for advanced custom telemetry)
export {
type Counter,
Expand Down
Loading