diff --git a/CLAUDE.md b/CLAUDE.md index 57e89cb9..771beb38 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -116,6 +116,7 @@ These are the things that aren't visually loud in either code or docs: - **Don't wrap things "just in case."** No backwards-compat shims, no feature flags for hypothetical use cases, no validation at internal boundaries. - **Match existing structure.** New engine concerns go under `packages/engine/src/`; cross-cutting types belong in `@sidequest/core`. Don't create a new package for a small piece. - **Tests live next to the code.** `foo.ts` + `foo.test.ts` in the same folder; integration tests under `tests/integration/`. Backend changes must keep `@sidequest/backend-test` green for every driver. +- **JSDoc every exported entity.** `CONTRIBUTING.md` requires JSDoc-style docstrings on all exports; match that when adding public API. - **Commits follow Conventional Commits** (commitlint + Husky enforce it). semantic-release publishes from `master`. ## Scope guard diff --git a/packages/core/src/job/abort-reason.test.ts b/packages/core/src/job/abort-reason.test.ts new file mode 100644 index 00000000..1aecbf4a --- /dev/null +++ b/packages/core/src/job/abort-reason.test.ts @@ -0,0 +1,24 @@ +import { describe, expect, it } from "vitest"; +import { deserializeAbortReason, JobCanceled, JobTimeout, serializeAbortReason } from "./abort-reason"; + +describe("abort-reason", () => { + it("round-trips a JobTimeout reason through the wire form", () => { + const message = serializeAbortReason(new JobTimeout(1500)); + expect(message).toEqual({ kind: "timeout", timeoutMs: 1500 }); + + const reason = deserializeAbortReason(message); + expect(reason).toBeInstanceOf(JobTimeout); + expect((reason as JobTimeout).timeoutMs).toBe(1500); + }); + + it("round-trips a JobCanceled reason through the wire form", () => { + const message = serializeAbortReason(new JobCanceled()); + expect(message).toEqual({ kind: "canceled" }); + expect(deserializeAbortReason(message)).toBeInstanceOf(JobCanceled); + }); + + it("treats any non-timeout reason as canceled", () => { + expect(serializeAbortReason(new Error("boom"))).toEqual({ kind: "canceled" }); + expect(serializeAbortReason(undefined)).toEqual({ kind: "canceled" }); + }); +}); diff --git a/packages/core/src/job/abort-reason.ts b/packages/core/src/job/abort-reason.ts new file mode 100644 index 00000000..b9611981 --- /dev/null +++ b/packages/core/src/job/abort-reason.ts @@ -0,0 +1,54 @@ +/** + * Reason set on a job's `abortSignal` when the engine aborts a running job. + * + * Inspect `job.abortSignal.reason` inside `run` to tell why the job is being aborted, e.g. to log + * or clean up differently for a timeout vs an explicit cancellation. + */ +export type AbortReason = JobTimeout | JobCanceled; + +/** + * Set as the `abortSignal.reason` when a job is aborted because it exceeded its `timeout`. + */ +export class JobTimeout extends Error { + /** The timeout, in milliseconds, that was exceeded. */ + readonly timeoutMs: number; + + constructor(timeoutMs: number) { + super(`Job timed out after ${timeoutMs}ms`); + this.name = "JobTimeout"; + this.timeoutMs = timeoutMs; + } +} + +/** + * Set as the `abortSignal.reason` when a running job is aborted because it was canceled. + */ +export class JobCanceled extends Error { + constructor() { + super("Job was canceled"); + this.name = "JobCanceled"; + } +} + +/** + * Structured-clone-safe wire form of an {@link AbortReason}, used to convey the reason to a job + * running in a worker thread (a live {@link AbortSignal} cannot cross the thread boundary). + */ +export type AbortReasonMessage = { kind: "timeout"; timeoutMs: number } | { kind: "canceled" }; + +/** + * Encodes an abort reason into its wire form. Anything that is not a {@link JobTimeout} is treated + * as a cancellation. + * @param reason The abort reason (typically `signal.reason`). + */ +export function serializeAbortReason(reason: unknown): AbortReasonMessage { + return reason instanceof JobTimeout ? { kind: "timeout", timeoutMs: reason.timeoutMs } : { kind: "canceled" }; +} + +/** + * Rebuilds the proper {@link AbortReason} instance from its wire form. + * @param message The wire-form message. + */ +export function deserializeAbortReason(message: AbortReasonMessage): AbortReason { + return message.kind === "timeout" ? new JobTimeout(message.timeoutMs) : new JobCanceled(); +} diff --git a/packages/core/src/job/index.ts b/packages/core/src/job/index.ts index 9fc50091..a9016069 100644 --- a/packages/core/src/job/index.ts +++ b/packages/core/src/job/index.ts @@ -1 +1,2 @@ +export * from "./abort-reason"; export * from "./job"; diff --git a/packages/core/src/job/job.test.ts b/packages/core/src/job/job.test.ts index 74706418..4124f5f1 100644 --- a/packages/core/src/job/job.test.ts +++ b/packages/core/src/job/job.test.ts @@ -37,6 +37,21 @@ describe("job.ts", () => { expect(transition.result).toBe("foo bar"); }); + it("exposes a non-aborting abortSignal by default", () => { + const job = new DummyJob(); + expect(job.abortSignal).toBeInstanceOf(AbortSignal); + expect(job.abortSignal.aborted).toBe(false); + }); + + it("injects the abort signal at runtime", () => { + const job = new DummyJob(); + const controller = new AbortController(); + job.injectAbortSignal(controller.signal); + expect(job.abortSignal).toBe(controller.signal); + controller.abort(); + expect(job.abortSignal.aborted).toBe(true); + }); + it("creates a fail transition", () => { const job = new DummyJob(); const transition = job.fail("error"); diff --git a/packages/core/src/job/job.ts b/packages/core/src/job/job.ts index e32bd9c7..59cf2730 100644 --- a/packages/core/src/job/job.ts +++ b/packages/core/src/job/job.ts @@ -79,6 +79,18 @@ export abstract class Job implements JobData { readonly backoff_strategy!: BackoffStrategy; readonly retry_delay!: number | null; + /** + * Signal that fires when the engine aborts this run (timeout or cancellation). Available inside + * `run`. Pass it to abort-aware APIs (e.g. `fetch(url, { signal: this.abortSignal })`) or check it + * cooperatively (`this.abortSignal.throwIfAborted()`, `this.abortSignal.aborted`). The reason is a + * `JobTimeout` or `JobCanceled` (see `abortSignal.reason`). + * + * In `runner: "inline"` mode this is the only way a running job can be stopped. In `"thread"` mode + * the worker is also terminated, so honoring the signal is optional (but enables graceful cleanup). + * Defaults to a signal that never aborts when no run is in progress. + */ + readonly abortSignal: AbortSignal = new AbortController().signal; + /** * Initializes the job and resolves its script path. */ @@ -102,6 +114,14 @@ export abstract class Job implements JobData { Object.assign(this, jobData); } + /** + * Injects the abort signal for this run into the job instance at runtime. + * @param signal The abort signal the engine controls for this execution. + */ + injectAbortSignal(signal: AbortSignal): void { + Object.assign(this, { abortSignal: signal }); + } + /** * The class name of this job. */ diff --git a/packages/docs/.vitepress/config.mts b/packages/docs/.vitepress/config.mts index fdac912f..f341bd3b 100644 --- a/packages/docs/.vitepress/config.mts +++ b/packages/docs/.vitepress/config.mts @@ -99,6 +99,7 @@ export default defineConfig({ collapsed: false, items: [ { text: "Backends", link: "/backends" }, + { text: "Execution Modes", link: "/execution-modes" }, { text: "Graceful Shutdown", link: "/graceful-shutdown" }, { text: "Cleanup", link: "/cleanup" }, { text: "Manual Job Resolution", link: "/manual-resolution" }, diff --git a/packages/docs/getting-started/configuration.md b/packages/docs/getting-started/configuration.md index 8be70380..442b4f95 100644 --- a/packages/docs/getting-started/configuration.md +++ b/packages/docs/getting-started/configuration.md @@ -122,6 +122,9 @@ await Sidequest.start({ minThreads: 4, maxThreads: 8, idleWorkerTimeout: 10000, // 10 seconds + fork: true, // run the engine in a child process + runner: "thread", // "thread" (worker pool) or "inline" + abortGracePeriodMs: 0, // grace before force-killing a timed-out/canceled thread job // 4. Migration and startup skipMigration: false, @@ -179,32 +182,35 @@ await Sidequest.start({ ### Configuration Options -| Option | Description | Default | -| -------------------------------- | --------------------------------------------------------------------------------------------------------------------------------- | --------------------------- | -| `backend.driver` | Backend driver package name (SQLite, Postgres, MySQL, MongoDB) | `@sidequest/sqlite-backend` | -| `backend.config` | Backend-specific connection string or [Knex configuration object](https://knexjs.org/guide/#configuration-options) | `./sidequest.sqlite` | -| `dashboard.enabled` | Whether to enable the dashboard web interface | `true` | -| `dashboard.port` | Port for the dashboard web interface | `8678` | -| `dashboard.auth` | Basic auth configuration with `user` and `password`. If omitted, no auth is required. | `undefined` | -| `queues` | Array of queue configurations with name, concurrency, priority, and state | `[]` | -| `maxConcurrentJobs` | Maximum number of jobs processed simultaneously across all queues | `10` | -| `minThreads` | Minimum number of worker threads to use | Number of CPU cores | -| `maxThreads` | Maximum number of worker threads to use | `minThreads * 2` | -| `idleWorkerTimeout` | Timeout (milliseconds) for idle workers before they are terminated | `10000` (10 seconds) | -| `skipMigration` | Whether to skip database migration on startup | `false` | -| `releaseStaleJobsIntervalMin` | Frequency (minutes) for releasing stale jobs. Set to `false` to disable | `60` | -| `releaseStaleJobsMaxStaleMs` | Maximum age (milliseconds) for a running job to be considered stale | `600000` (10 minutes) | -| `releaseStaleJobsMaxClaimedMs` | Maximum age (milliseconds) for a claimed job to be considered stale | `60000` (1 minute) | -| `cleanupFinishedJobsIntervalMin` | Frequency (minutes) for cleaning up finished jobs. Set to `false` to disable | `60` | -| `cleanupFinishedJobsOlderThan` | Age (milliseconds) after which finished jobs are deleted | `2592000000` (30 days) | -| `logger.level` | Minimum log level (`debug`, `info`, `warn`, `error`) | `info` | -| `logger.json` | Whether to output logs in JSON format | `false` | -| `gracefulShutdown` | Whether to enable graceful shutdown handling | `true` | -| `jobDefaults` | Default values for new jobs. Used while enqueueing | `undefined` | -| `queueDefaults` | Default values for auto-created queues | `undefined` | -| `manualJobResolution` | Whether to manually resolve job classes. See [Manual Job Resolution](/production/manual-resolution) | `false` | -| `jobsFilePath` | Optional path to the file where job classes are exported. Ignored if `manualJobResolution` is `false`. | `undefined` | -| `jobPollingInterval` | Interval (milliseconds) for polling new jobs to process. Increase this number to reduce DB load at the cost of job start latency. | `100` (100 milliseconds) | +| Option | Description | Default | +| -------------------------------- | ---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | --------------------------- | +| `backend.driver` | Backend driver package name (SQLite, Postgres, MySQL, MongoDB) | `@sidequest/sqlite-backend` | +| `backend.config` | Backend-specific connection string or [Knex configuration object](https://knexjs.org/guide/#configuration-options) | `./sidequest.sqlite` | +| `dashboard.enabled` | Whether to enable the dashboard web interface | `true` | +| `dashboard.port` | Port for the dashboard web interface | `8678` | +| `dashboard.auth` | Basic auth configuration with `user` and `password`. If omitted, no auth is required. | `undefined` | +| `queues` | Array of queue configurations with name, concurrency, priority, and state | `[]` | +| `maxConcurrentJobs` | Maximum number of jobs processed simultaneously across all queues | `10` | +| `fork` | Run the engine in a child process (crash isolation). Set `false` to run in-process. See [Execution Modes](/production/execution-modes#fork-process-isolation) | `true` | +| `runner` | How jobs run: `"thread"` (worker pool) or `"inline"` (current thread). See [Execution Modes](/production/execution-modes#runner-thread-pool-vs-inline) | `"thread"` | +| `abortGracePeriodMs` | Grace period (ms) before a timed-out/canceled job's worker **thread** is force-killed. `0` kills immediately. No effect with `runner: "inline"`. See [Execution Modes](/production/execution-modes#cooperative-timeout-and-cancellation) | `0` | +| `minThreads` | Minimum number of worker threads to use (`runner: "thread"` only) | Number of CPU cores | +| `maxThreads` | Maximum number of worker threads to use (`runner: "thread"` only) | `minThreads * 2` | +| `idleWorkerTimeout` | Timeout (milliseconds) for idle workers before they are terminated (`runner: "thread"` only) | `10000` (10 seconds) | +| `skipMigration` | Whether to skip database migration on startup | `false` | +| `releaseStaleJobsIntervalMin` | Frequency (minutes) for releasing stale jobs. Set to `false` to disable | `60` | +| `releaseStaleJobsMaxStaleMs` | Maximum age (milliseconds) for a running job to be considered stale | `600000` (10 minutes) | +| `releaseStaleJobsMaxClaimedMs` | Maximum age (milliseconds) for a claimed job to be considered stale | `60000` (1 minute) | +| `cleanupFinishedJobsIntervalMin` | Frequency (minutes) for cleaning up finished jobs. Set to `false` to disable | `60` | +| `cleanupFinishedJobsOlderThan` | Age (milliseconds) after which finished jobs are deleted | `2592000000` (30 days) | +| `logger.level` | Minimum log level (`debug`, `info`, `warn`, `error`) | `info` | +| `logger.json` | Whether to output logs in JSON format | `false` | +| `gracefulShutdown` | Whether to enable graceful shutdown handling | `true` | +| `jobDefaults` | Default values for new jobs. Used while enqueueing | `undefined` | +| `queueDefaults` | Default values for auto-created queues | `undefined` | +| `manualJobResolution` | Whether to manually resolve job classes. See [Manual Job Resolution](/production/manual-resolution) | `false` | +| `jobsFilePath` | Optional path to the file where job classes are exported. Ignored if `manualJobResolution` is `false`. | `undefined` | +| `jobPollingInterval` | Interval (milliseconds) for polling new jobs to process. Increase this number to reduce DB load at the cost of job start latency. | `100` (100 milliseconds) | ::: danger If `auth` is not configured and `dashboard: true` is enabled in production, the dashboard will be publicly accessible. This is a security risk and **not recommended**. diff --git a/packages/docs/guide/jobs/lifecycle.md b/packages/docs/guide/jobs/lifecycle.md index 304d8544..53ab0d5e 100644 --- a/packages/docs/guide/jobs/lifecycle.md +++ b/packages/docs/guide/jobs/lifecycle.md @@ -67,7 +67,11 @@ Jobs can be manually canceled at any point before completion: - Waiting jobs are immediately marked as `canceled` - Claimed jobs are marked as `canceled` before execution an are prevented from running -- Running jobs receive a cancellation signal and transition to `canceled` +- Running jobs receive a cancellation signal via `this.abortSignal` and transition to `canceled` + +::: warning +Stopping a _running_ job depends on the [execution mode](/production/execution-modes#cooperative-timeout-and-cancellation). With the default thread pool the worker is terminated. In `runner: "inline"` mode the job cannot be force-stopped, so it must honor `this.abortSignal`; a running inline job that ignores it finishes with its own result instead of `canceled`. The same applies to job timeouts. +::: ## Best Practices diff --git a/packages/docs/guide/jobs/running.md b/packages/docs/guide/jobs/running.md index cf2e6c27..09a9fbe7 100644 --- a/packages/docs/guide/jobs/running.md +++ b/packages/docs/guide/jobs/running.md @@ -35,15 +35,16 @@ When you need finer control — fail without retrying, retry with a custom delay Before `run()` executes, Sidequest injects read-only properties onto `this`: -| Property | Type | Description | -| ------------------- | ----------- | -------------------------------- | -| `this.id` | `string` | Job ID | -| `this.attempt` | `number` | Current attempt number (1-based) | -| `this.max_attempts` | `number` | Maximum allowed attempts | -| `this.queue` | `string` | Queue the job is running in | -| `this.state` | `string` | Current state (`"running"`) | -| `this.inserted_at` | `Date` | When the job was first enqueued | -| `this.args` | `unknown[]` | The run arguments | +| Property | Type | Description | +| ------------------- | ------------- | --------------------------------------------------------------------------------------------------- | +| `this.id` | `string` | Job ID | +| `this.attempt` | `number` | Current attempt number (1-based) | +| `this.max_attempts` | `number` | Maximum allowed attempts | +| `this.queue` | `string` | Queue the job is running in | +| `this.state` | `string` | Current state (`"running"`) | +| `this.inserted_at` | `Date` | When the job was first enqueued | +| `this.args` | `unknown[]` | The run arguments | +| `this.abortSignal` | `AbortSignal` | Aborts when the job times out or is canceled. See [below](#responding-to-timeout-and-cancellation). | ::: warning These properties are only available inside `run()`. They are `undefined` in the constructor. @@ -57,9 +58,10 @@ These methods let you explicitly transition the job to a specific lifecycle stat You must **`return`** the result of every flow control method. Calling one without returning it is a no-op — the transition won't happen. ```typescript -this.fail("reason"); // ❌ does nothing +this.fail("reason"); // ❌ does nothing return this.fail("reason"); // ✅ transitions to failed ``` + ::: ### `return this.complete(result)` @@ -129,15 +131,42 @@ async run(payload: unknown) { Use `snooze` for time-based deferrals: rate limit windows, maintenance modes, business hours. +## Responding to timeout and cancellation + +When a job exceeds its `timeout`, or is canceled (via the dashboard or `Sidequest.job.cancel(id)`), Sidequest aborts `this.abortSignal`. Use it to stop your work promptly: + +```typescript +async run(url: string) { + // Pass it to any abort-aware API; it cancels automatically. + const res = await fetch(url, { signal: this.abortSignal }); + + // Or check it cooperatively in loops / between steps. + for (const item of await res.json()) { + this.abortSignal.throwIfAborted(); // throws if timed out / canceled + await process(item); + } +} +``` + +`this.abortSignal.reason` is a `JobTimeout` or `JobCanceled` (both exported from `sidequest`) so you can react differently to each. + +::: danger Whether the signal can actually stop the job depends on the execution mode + +- In the default thread pool with `abortGracePeriodMs: 0`, the worker is terminated, so honoring the signal is optional (it just lets you clean up; set a grace period to get a cooperative window). +- In **`runner: "inline"` mode there is no way to forcibly stop a job.** If your job ignores `this.abortSignal`, timeouts and cancellation **will not stop it**: it runs to completion. Honoring the signal is mandatory for long-running inline jobs. + +See [Execution Modes](/production/execution-modes#cooperative-timeout-and-cancellation) for the full behavior across modes. +::: + ## Choosing the right method -| Situation | Use | -|---|---| -| Normal completion | `return result` or `return this.complete(result)` | -| Permanent, unrecoverable error | `return this.fail(reason)` | -| Transient error, controlled retry delay | `return this.retry(reason, delay)` | -| Not the right time — try again later | `return this.snooze(delay)` | -| Unexpected error — let Sidequest decide | `throw error` | +| Situation | Use | +| --------------------------------------- | ------------------------------------------------- | +| Normal completion | `return result` or `return this.complete(result)` | +| Permanent, unrecoverable error | `return this.fail(reason)` | +| Transient error, controlled retry delay | `return this.retry(reason, delay)` | +| Not the right time — try again later | `return this.snooze(delay)` | +| Unexpected error — let Sidequest decide | `throw error` | ## Best practices diff --git a/packages/docs/introduction/how-it-works.md b/packages/docs/introduction/how-it-works.md index bccf7c42..771b16c7 100644 --- a/packages/docs/introduction/how-it-works.md +++ b/packages/docs/introduction/how-it-works.md @@ -30,6 +30,10 @@ Your app process Because the engine is a separate process, a job that calls `process.exit()` or throws an unhandled exception will kill the engine process but **not your app**. The engine restarts automatically. +::: tip +This forked, worker-thread model is the default and the right choice for most deployments. For serverless runtimes, test suites, or framework integrations that need jobs to share live in-process state, you can run the engine in-process and/or run jobs inline. See [Execution Modes](/production/execution-modes). +::: + ## How jobs are claimed The Dispatcher polls the database at a configurable interval (default: **100 ms**). When it finds waiting jobs that fit within queue concurrency limits, it claims them atomically: diff --git a/packages/docs/production/execution-modes.md b/packages/docs/production/execution-modes.md new file mode 100644 index 00000000..5216ce97 --- /dev/null +++ b/packages/docs/production/execution-modes.md @@ -0,0 +1,194 @@ +--- +outline: deep +title: Execution Modes +description: Choose how and where Sidequest runs your jobs (forked vs in-process, thread pool vs inline) and how cooperative timeout/cancellation works. +--- + +# Execution Modes + +By default Sidequest runs your jobs with **two layers of isolation**: the engine runs in a forked child process, and each job runs in its own worker thread inside that process (see [How It Works](/introduction/how-it-works)). This is the most robust setup and what you want in most deployments. + +Some environments and integrations need a different trade-off. Two independent options let you change where and how jobs run: + +- [`fork`](#fork-process-isolation): run the engine in a child process (default) or in your application's process. +- [`runner`](#runner-thread-pool-vs-inline): run each job in a worker thread pool (default) or inline in the current thread. + +They are orthogonal: `fork` controls the **process**, `runner` controls the **thread**. A related option, [`abortGracePeriodMs`](#cooperative-timeout-and-cancellation), controls how timeouts and cancellations stop a running job. + +::: tip TL;DR +Keep the defaults (`fork: true`, `runner: "thread"`) unless you have a concrete reason not to. Reach for `inline` + `fork: false` for serverless, test suites, or framework integrations that need jobs to share live in-process state. +::: + +## `fork`: process isolation + +```typescript +await Sidequest.start({ fork: false }); // default: true +``` + +| Value | Where the engine runs | Crash isolation | +| ---------------- | -------------------------- | -------------------------------------------------------------------------------------------------- | +| `true` (default) | A `child_process.fork` | A job crash (or `process.exit()`) kills the fork, not your app. The engine restarts automatically. | +| `false` | Your application's process | No isolation. An uncaught error in job code can take down your app. | + +Use `fork: false` when: + +- You can't spawn child processes (many **serverless / edge** runtimes). +- You're running an **integration test** and want to avoid IPC and process teardown flakiness. +- Your jobs need access to **live, in-process state** that can't cross a process boundary, for example a dependency-injection container. + +## `runner`: thread pool vs inline + +```typescript +await Sidequest.start({ runner: "inline" }); // default: "thread" +``` + +| Value | How a job runs | CPU isolation | Can be force-stopped? | +| -------------------- | ------------------------------------------------------------------ | ------------- | ------------------------------------- | +| `"thread"` (default) | In a [piscina](https://github.com/piscinajs/piscina) worker thread | Yes | Yes (the worker thread is terminated) | +| `"inline"` | Directly in the current thread, no pool | No | **No** | + +With `runner: "thread"`, `minThreads` / `maxThreads` / `idleWorkerTimeout` size the pool, and a job can be forcibly stopped by terminating its worker thread. + +With `runner: "inline"`, there is no pool and no separate thread. This is required when jobs must reach state that lives in the current thread, and it's handy for single-process setups. But it comes with two important consequences: + +::: warning Inline jobs block the event loop +An inline job runs on the same thread as everything else in that process: the dispatcher, and your app too if `fork: false`. A **CPU-bound** inline job will starve all of it until it finishes. Keep inline jobs I/O-bound, or use the thread pool for heavy work. +::: + +::: danger Inline jobs cannot be forcibly stopped +There is no separate thread to terminate, so Sidequest **cannot** kill a running inline job. Timeouts and cancellation only work if the job **cooperates** with the abort signal (see [Cooperative timeout and cancellation](#cooperative-timeout-and-cancellation) below). A job that ignores the signal runs to completion no matter what. +::: + +## Choosing a combination + +`fork` and `runner` combine into four setups: + +| `fork` | `runner` | Crash isolation | CPU isolation | Typical use | +| ------- | -------- | ---------------- | ------------- | ----------------------------------------------------------------------- | +| `true` | `thread` | ✅ | ✅ | **Default.** Production. | +| `true` | `inline` | ✅ (engine fork) | ❌ | Lighter execution with crash isolation kept; e.g. SQLite single-writer. | +| `false` | `thread` | ❌ | ✅ | Run in-process but still isolate CPU per job. | +| `false` | `inline` | ❌ | ❌ | Serverless, tests, and integrations that need live in-process state. | + +::: code-group + +```typescript [Serverless / single-process] +// No child process, no worker threads: everything in one place. +await Sidequest.start({ + fork: false, + runner: "inline", + backend: { driver: "@sidequest/postgres-backend", config: process.env.DATABASE_URL }, +}); +``` + +```typescript [SQLite] +// SQLite is single-writer; running jobs inline avoids cross-thread write contention. +await Sidequest.start({ + runner: "inline", + maxConcurrentJobs: 1, + backend: { driver: "@sidequest/sqlite-backend", config: "./jobs.sqlite" }, +}); +``` + +```typescript [Integration tests] +await Sidequest.start({ + fork: false, // no IPC to wait on + runner: "inline", // deterministic, in-process execution + backend: { driver: "@sidequest/sqlite-backend", config: ":memory:" }, +}); +``` + +::: + +::: warning SQLite and concurrency +SQLite allows a single writer. Concurrency above 1 against the same file leads to `SQLITE_BUSY`. Keep `maxConcurrentJobs: 1`, use a separate `.sqlite` file from your app, or use a server database (Postgres/MySQL) for real concurrency. This is independent of the execution mode. +::: + +## Cooperative timeout and cancellation + +A job is stopped early in two cases: it exceeds its `timeout`, or it is canceled (via the dashboard or `Sidequest.job.cancel(id)`). How that actually stops the job depends on the mode. + +Sidequest hands every job an [`AbortSignal`](https://developer.mozilla.org/en-US/docs/Web/API/AbortSignal) at `this.abortSignal`. When a timeout or cancellation fires, that signal aborts. Your job can observe it and stop: + +```typescript +import { Job } from "sidequest"; + +export class SyncContactsJob extends Job { + async run(accountId: string) { + // 1. Hand the signal to anything that accepts one; it aborts automatically. + const res = await fetch(`https://api.example.com/${accountId}/contacts`, { + signal: this.abortSignal, + }); + const contacts = await res.json(); + + // 2. For long loops or CPU work, check it cooperatively. + for (const contact of contacts) { + this.abortSignal.throwIfAborted(); // bail out promptly on timeout/cancel + await upsert(contact); + } + + return this.complete({ synced: contacts.length }); + } +} +``` + +`this.abortSignal.reason` tells you _why_ it aborted. It is a `JobTimeout` or a `JobCanceled`: + +```typescript +import { JobTimeout, JobCanceled } from "sidequest"; + +this.abortSignal.addEventListener("abort", () => { + const reason = this.abortSignal.reason; + if (reason instanceof JobTimeout) { + // exceeded `timeout` + } else if (reason instanceof JobCanceled) { + // canceled by an operator + } +}); +``` + +### When does the job actually receive the signal? + +| Mode | Gets a live `abortSignal`? | If the job ignores it | +| ----------------------------------------------------- | --------------------------------- | --------------------------------------------- | +| `runner: "inline"` | **Always** | Runs to completion (cannot be force-stopped). | +| `runner: "thread"`, `abortGracePeriodMs: 0` (default) | No (worker is killed immediately) | Killed right away. | +| `runner: "thread"`, `abortGracePeriodMs > 0` | Yes, for the grace window | Killed after the grace period. | + +So in inline mode `this.abortSignal` is effectively mandatory for any long-running job: a job that does not honor it keeps running until it returns on its own (timeouts and cancellation cannot stop it). + +### `abortGracePeriodMs`: graceful kill for thread jobs + +```typescript +await Sidequest.start({ abortGracePeriodMs: 5000 }); // default: 0 +``` + +Applies only to `runner: "thread"`. It controls the window between _signaling_ an abort and _forcibly terminating_ the worker thread: + +- `0` (default): the worker is terminated immediately. The job is not given a chance to react, and `this.abortSignal` is not delivered to it. This is the historical behavior. +- `> 0`: the abort is delivered to the job via `this.abortSignal` first; if the job has not finished after this many milliseconds, the worker thread is terminated. Use this to let thread jobs clean up (close handles, flush buffers) before being killed. + +::: tip +A positive grace period allocates a small message channel per job to deliver the abort into the worker. The cost only applies while a grace period is configured, and only matters for the rare cancel/timeout. Leave it at `0` unless your thread jobs need graceful shutdown. +::: + +### What state does the job end in? + +The terminal state is decided when the run **actually ends**, never while it is still running (so a job is never re-queued while a copy of it is still in flight): + +| What happened | Terminal state | +| -------------------------------------------------------------------- | --------------------------------------------------------------------------------------------------------------------------------------------------- | +| The job returned a value/transition (it finished) | Whatever the job returned (`completed`, `failed`, a retry, etc.). This holds **even if** a timeout/cancel was signaled but the job finished anyway. | +| The worker was hard-killed by a **timeout** (thread, no result) | Retried (or `failed` if no attempts remain). | +| The worker was hard-killed by a **cancellation** (thread, no result) | `canceled`. | +| The job threw an unexpected error | Retried (or `failed`). | + +::: warning Canceling a running inline job is best-effort +Because an inline job's result is respected once it returns, a running inline job that **ignores** a cancellation and finishes will be recorded with its own result (e.g. `completed`), not `canceled`. Cancellation of a _running_ inline job only takes effect if the job honors `this.abortSignal`. Canceling a **waiting** job always works (it is simply never claimed). +::: + +## Next steps + +- [Execution and Control](/guide/jobs/running): using `this.abortSignal` inside `run()` +- [Configuration reference](/getting-started/configuration): all engine options +- [Graceful Shutdown](/production/graceful-shutdown): draining jobs on shutdown diff --git a/packages/engine/src/engine.test.ts b/packages/engine/src/engine.test.ts index b21f5028..70ece959 100644 --- a/packages/engine/src/engine.test.ts +++ b/packages/engine/src/engine.test.ts @@ -24,6 +24,23 @@ vi.mock("child_process", () => ({ }), })); +// Mock the in-process worker runtime so the no-fork path doesn't run a real dispatcher loop. +const workerRuntimeMocks = vi.hoisted(() => ({ + start: vi.fn().mockResolvedValue(undefined), + shutdown: vi.fn().mockResolvedValue(undefined), +})); + +vi.mock("./workers/worker-runtime", () => ({ + WorkerRuntime: vi.fn(function () { + return { + start: workerRuntimeMocks.start, + shutdown: workerRuntimeMocks.shutdown, + }; + }), +})); + +import { fork } from "child_process"; + export class ParameterizedJob extends DummyJob { constructor( public param1: string, @@ -399,6 +416,26 @@ describe("Engine", () => { await engine.close(); }); + sidequestTest("runs in-process and skips fork when fork is false", async () => { + vi.mocked(fork).mockClear(); + workerRuntimeMocks.start.mockClear(); + workerRuntimeMocks.shutdown.mockClear(); + + const engine = new Engine(); + + await engine.start({ + backend: { driver: "@sidequest/sqlite-backend", config: ":memory:" }, + fork: false, + gracefulShutdown: false, + }); + + expect(workerRuntimeMocks.start).toHaveBeenCalledTimes(1); + expect(fork).not.toHaveBeenCalled(); + + await engine.close(); + expect(workerRuntimeMocks.shutdown).toHaveBeenCalledTimes(1); + }); + sidequestTest("should warn when starting already started engine", async () => { const engine = new Engine(); const config = { diff --git a/packages/engine/src/engine.ts b/packages/engine/src/engine.ts index cb8daf87..c53e5904 100644 --- a/packages/engine/src/engine.ts +++ b/packages/engine/src/engine.ts @@ -13,6 +13,7 @@ import { JobBuilder, JobBuilderDefaults } from "./job/job-builder"; import { grantQueueConfig, QueueDefaults } from "./queue/grant-queue-config"; import { findSidequestJobsScriptInParentDirs, resolveScriptPath } from "./shared-runner"; import { clearGracefulShutdown, gracefulShutdown } from "./utils/shutdown"; +import { WorkerRuntime } from "./workers/worker-runtime"; /** * Configuration options for the Sidequest engine. @@ -40,6 +41,45 @@ export interface EngineConfig { cleanupFinishedJobsOlderThan?: number; /** Whether to enable graceful shutdown handling. Defaults to `true` */ gracefulShutdown?: boolean; + /** + * Whether to run the engine in a forked child process. + * + * - `true` (default): the engine runs in a `child_process.fork`, isolating job-code crashes from + * the host application. + * - `false`: the engine runs in the host process. A crash in job code can take down the host, but + * jobs can reach live in-process state. Useful for single-process setups (serverless, tests) + * and required by framework integrations that rely on in-process execution. + * + * Defaults to `true`. + */ + fork?: boolean; + /** + * How jobs are executed. + * + * - `"thread"` (default): jobs run in a pool of worker threads (piscina). Gives CPU isolation + * and lets timeouts/cancellation forcibly abort a running job. + * - `"inline"`: jobs run in the current process/thread, with no worker pool. A running job cannot + * be forcibly terminated, so timeouts and cancellation are delivered cooperatively via + * `this.abortSignal` inside the job (the job must honor it to stop early); a job that ignores it + * runs to completion, and a CPU-bound job will block the event loop. Useful for single-process + * setups (serverless, tests, SQLite) and required when jobs need access to live in-process state. + * + * Defaults to `"thread"`. + */ + runner?: "thread" | "inline"; + /** + * Grace period, in milliseconds, between cooperatively aborting a running job (timeout or + * cancellation) and forcibly terminating its worker thread. + * + * Only applies to `runner: "thread"`. When greater than `0`, the abort is first delivered to the + * job via `this.abortSignal` so it can stop and clean up; if it has not finished after this many + * milliseconds, the worker thread is terminated. When `0` (default), the worker is terminated + * immediately with no cooperative window, preserving the previous behavior. Has no effect in + * `runner: "inline"` (there is no thread to terminate). + * + * Defaults to `0`. + */ + abortGracePeriodMs?: number; /** Minimum number of worker threads to use. Defaults to number of CPUs */ minThreads?: number; /** Maximum number of worker threads to use. Defaults to `minThreads * 2` */ @@ -123,6 +163,12 @@ export class Engine { */ private mainWorker?: ChildProcess; + /** + * Worker runtime when the engine runs in-process (`fork: false`). + * Mutually exclusive with {@link mainWorker}. + */ + private inProcessRuntime?: WorkerRuntime; + /** * Flag indicating whether the engine is currently shutting down. * This is used to prevent multiple shutdown attempts and ensure graceful shutdown behavior. @@ -155,6 +201,9 @@ export class Engine { json: config?.logger?.json ?? false, }, gracefulShutdown: config?.gracefulShutdown ?? true, + fork: config?.fork ?? true, + runner: config?.runner ?? "thread", + abortGracePeriodMs: config?.abortGracePeriodMs ?? 0, minThreads: config?.minThreads ?? cpus().length, maxThreads: config?.maxThreads ?? cpus().length * 2, idleWorkerTimeout: config?.idleWorkerTimeout ?? 10_000, @@ -241,7 +290,7 @@ export class Engine { * @param config Optional configuration object. */ async start(config: EngineConfig): Promise { - if (this.mainWorker) { + if (this.mainWorker || this.inProcessRuntime) { logger("Engine").warn("Sidequest engine already started"); return; } @@ -256,6 +305,14 @@ export class Engine { } } + if (!nonNullConfig.fork) { + logger("Engine").info("Starting Sidequest in-process (fork disabled)"); + this.inProcessRuntime = new WorkerRuntime(dependencyRegistry.get(Dependency.Backend)!, nonNullConfig); + await this.inProcessRuntime.start(); + gracefulShutdown(this.close.bind(this), "Engine", nonNullConfig.gracefulShutdown); + return; + } + return new Promise((resolve, reject) => { const timeout = setTimeout(() => { reject(new Error("Timeout on starting sidequest fork!")); @@ -323,12 +380,16 @@ export class Engine { this.mainWorker.send({ type: "shutdown" }); await promise; } + if (this.inProcessRuntime) { + await this.inProcessRuntime.shutdown(); + } try { await dependencyRegistry.get(Dependency.Backend)?.close(); } catch (error) { logger("Engine").error("Error closing backend:", error); } this.mainWorker = undefined; + this.inProcessRuntime = undefined; // Reset the shutting down flag after closing // This allows the engine to be reconfigured or restarted later clearGracefulShutdown(); diff --git a/packages/engine/src/execution/dispatcher.ts b/packages/engine/src/execution/dispatcher.ts index 493fe82c..eec75ba3 100644 --- a/packages/engine/src/execution/dispatcher.ts +++ b/packages/engine/src/execution/dispatcher.ts @@ -60,8 +60,11 @@ export class Dispatcher { // because the execution is not awaited. This way we ensure that available slots // are correctly calculated. this.executorManager.queueJob(queue, job); - // does not await for job execution. - void this.executorManager.execute(queue, job); + // does not await for job execution. Guard against any unexpected rejection so a single + // job can never crash the engine with an unhandled promise rejection. + void this.executorManager.execute(queue, job).catch((error: unknown) => { + logger("Dispatcher").error(`Unexpected error executing job ${job.id}:`, error); + }); } } diff --git a/packages/engine/src/execution/executor-manager.test.ts b/packages/engine/src/execution/executor-manager.test.ts index 0565887f..850bfb78 100644 --- a/packages/engine/src/execution/executor-manager.test.ts +++ b/packages/engine/src/execution/executor-manager.test.ts @@ -1,13 +1,20 @@ import { sidequestTest, SidequestTestFixture } from "@/tests/fixture"; import { Backend } from "@sidequest/backend"; -import { CompletedResult, JobData, RetryTransition, RunTransition } from "@sidequest/core"; -import EventEmitter from "events"; +import { + CancelTransition, + CompletedResult, + CompleteTransition, + JobData, + RetryTransition, + RunTransition, +} from "@sidequest/core"; import { JobTransitioner } from "../job/job-transitioner"; import { grantQueueConfig } from "../queue/grant-queue-config"; import { DummyJob } from "../test-jobs/dummy-job"; import { ExecutorManager } from "./executor-manager"; const runMock = vi.hoisted(() => vi.fn()); +const inlineRunMock = vi.hoisted(() => vi.fn()); vi.mock("../shared-runner", () => ({ RunnerPool: vi.fn().mockImplementation(function () { @@ -16,6 +23,12 @@ vi.mock("../shared-runner", () => ({ destroy: vi.fn(), }; }), + InlineRunner: vi.fn().mockImplementation(function () { + return { + run: inlineRunMock, + destroy: vi.fn(), + }; + }), })); vi.mock("../job/job-transitioner", () => ({ @@ -65,12 +78,77 @@ describe("ExecutorManager", () => { expect(executorManager.availableSlotsGlobal()).toEqual(9); await execPromise; - expect(runMock).toBeCalledWith(jobData, expect.any(EventEmitter)); + expect(runMock).toBeCalledWith(jobData, expect.any(AbortSignal)); expect(executorManager.availableSlotsByQueue(queryConfig)).toEqual(1); expect(executorManager.availableSlotsGlobal()).toEqual(10); await executorManager.destroy(); }); + sidequestTest("uses the inline runner when runner is 'inline'", async ({ backend, config }) => { + inlineRunMock.mockResolvedValue({ + __is_job_transition__: true, + type: "completed", + result: "result", + } satisfies CompletedResult); + const queryConfig = await grantQueueConfig(backend, { name: "default", concurrency: 1 }); + const executorManager = new ExecutorManager(backend, { ...config, runner: "inline" }); + + await executorManager.execute(queryConfig, jobData); + + expect(inlineRunMock).toBeCalledWith(jobData, expect.any(AbortSignal)); + expect(runMock).not.toHaveBeenCalled(); + await executorManager.destroy(); + inlineRunMock.mockReset(); + }); + + sidequestTest( + "inline: a job that ignores the timeout completes instead of being retried", + async ({ backend, config }) => { + jobData = await backend.updateJob({ ...jobData, state: "claimed", claimed_at: new Date(), timeout: 20 }); + const queryConfig = await grantQueueConfig(backend, { name: "default", concurrency: 1 }); + const executorManager = new ExecutorManager(backend, { ...config, runner: "inline" }); + + // The job ignores the abort signal and completes after the timeout has already fired. + inlineRunMock.mockImplementationOnce( + () => + new Promise((resolve) => + setTimeout(() => resolve({ __is_job_transition__: true, type: "completed", result: "done" }), 60), + ), + ); + + await executorManager.execute(queryConfig, jobData); + + // Timeout fired (signal aborted) but inline applies the job's own result: completed, not a retry. + // eslint-disable-next-line @typescript-eslint/unbound-method + expect(JobTransitioner.apply).toHaveBeenCalledWith(backend, jobData, expect.any(CompleteTransition)); + // eslint-disable-next-line @typescript-eslint/unbound-method + expect(JobTransitioner.apply).not.toHaveBeenCalledWith(backend, jobData, expect.any(RetryTransition)); + await executorManager.destroy(); + inlineRunMock.mockReset(); + }, + ); + + sidequestTest( + "inline: a job that ignores cancellation completes (its result wins)", + async ({ backend, config }) => { + // Pre-cancel in the DB so the first cancellation poll observes it immediately. JobTransitioner is + // mocked here, so the RunTransition does not overwrite the persisted state. + jobData = await backend.updateJob({ ...jobData, state: "canceled" }); + const queryConfig = await grantQueueConfig(backend, { name: "default", concurrency: 1 }); + const executorManager = new ExecutorManager(backend, { ...config, runner: "inline" }); + + inlineRunMock.mockResolvedValue({ __is_job_transition__: true, type: "completed", result: "done" }); + + await executorManager.execute(queryConfig, jobData); + + // Inline cannot force-stop the job; it completed, so its result is applied. + // eslint-disable-next-line @typescript-eslint/unbound-method + expect(JobTransitioner.apply).toHaveBeenCalledWith(backend, jobData, expect.any(CompleteTransition)); + await executorManager.destroy(); + inlineRunMock.mockReset(); + }, + ); + sidequestTest("should abort job execution on job cancel", async ({ backend, config }) => { await backend.updateJob({ ...jobData, state: "claimed", claimed_at: new Date() }); @@ -78,9 +156,9 @@ describe("ExecutorManager", () => { const executorManager = new ExecutorManager(backend, config); let expectedPromise; - runMock.mockImplementationOnce(async (job: JobData, signal: EventEmitter) => { + runMock.mockImplementationOnce(async (job: JobData, signal: AbortSignal) => { const promise = new Promise((_, reject) => { - signal.on("abort", () => { + signal.addEventListener("abort", () => { reject(new Error("The task has been aborted")); }); }); @@ -95,7 +173,7 @@ describe("ExecutorManager", () => { expect(executorManager.availableSlotsGlobal()).toEqual(9); await execPromise; - expect(runMock).toBeCalledWith(jobData, expect.any(EventEmitter)); + expect(runMock).toBeCalledWith(jobData, expect.any(AbortSignal)); expect(runMock).toHaveReturnedWith(expectedPromise); await expect(expectedPromise).rejects.toThrow("The task has been aborted"); expect(executorManager.availableSlotsByQueue(queryConfig)).toEqual(1); @@ -109,6 +187,79 @@ describe("ExecutorManager", () => { await executorManager.destroy(); }); + sidequestTest("respects a job's returned result even after a cancel was signaled", async ({ backend, config }) => { + await backend.updateJob({ ...jobData, state: "claimed", claimed_at: new Date() }); + + const queryConfig = await grantQueueConfig(backend, { name: "default", concurrency: 1 }); + const executorManager = new ExecutorManager(backend, config); + + // The job is canceled mid-run but ignores the abort signal and returns a completed result. + runMock.mockImplementationOnce(async (job: JobData, signal: AbortSignal) => { + await backend.updateJob({ ...job, state: "canceled" }); + while (!signal.aborted) { + await new Promise((r) => setTimeout(r, 50)); + } + return { __is_job_transition__: true, type: "completed", result: "result" } as CompletedResult; + }); + + await executorManager.execute(queryConfig, jobData); + + // The job returned a state, so it is respected: the completion is applied. + // eslint-disable-next-line @typescript-eslint/unbound-method + expect(JobTransitioner.apply).toHaveBeenCalledWith(backend, jobData, expect.any(CompleteTransition)); + + await executorManager.destroy(); + }); + + sidequestTest("a hard-killed canceled job transitions to canceled", async ({ backend, config }) => { + await backend.updateJob({ ...jobData, state: "claimed", claimed_at: new Date() }); + + const queryConfig = await grantQueueConfig(backend, { name: "default", concurrency: 1 }); + const executorManager = new ExecutorManager(backend, config); + + // The job is canceled and never produces a result (the worker is terminated -> the run rejects). + runMock.mockImplementationOnce(async (job: JobData, signal: AbortSignal) => { + await backend.updateJob({ ...job, state: "canceled" }); + return new Promise((_, reject) => { + signal.addEventListener("abort", () => reject(new Error("The task has been aborted"))); + }); + }); + + await executorManager.execute(queryConfig, jobData); + + // No result: the abort reason (canceled) decides the terminal state. + // eslint-disable-next-line @typescript-eslint/unbound-method + expect(JobTransitioner.apply).toHaveBeenCalledWith(backend, jobData, expect.any(CancelTransition)); + // eslint-disable-next-line @typescript-eslint/unbound-method + expect(JobTransitioner.apply).not.toHaveBeenCalledWith(backend, jobData, expect.any(CompleteTransition)); + + await executorManager.destroy(); + }); + + sidequestTest("does not crash when the final transition fails (job row gone)", async ({ backend, config }) => { + const queryConfig = await grantQueueConfig(backend, { name: "default", concurrency: 1 }); + const executorManager = new ExecutorManager(backend, config); + + // RunTransition succeeds; the terminal transition fails because the row was deleted mid-run. + // eslint-disable-next-line @typescript-eslint/unbound-method + vi.mocked(JobTransitioner.apply) + .mockImplementationOnce((_backend: Backend, job: JobData) => job) + .mockImplementationOnce(() => { + throw new Error("Cannot update job, not found."); + }); + runMock.mockResolvedValue({ + __is_job_transition__: true, + type: "completed", + result: "ok", + } satisfies CompletedResult); + + // The fire-and-forget executor must not reject, and must free the job from the active set. + await expect(executorManager.execute(queryConfig, jobData)).resolves.toBeUndefined(); + expect(executorManager.totalActiveWorkers()).toBe(0); + + await executorManager.destroy(); + }); + sidequestTest("should abort job execution on timeout", async ({ backend, config }) => { jobData = await backend.updateJob({ ...jobData, state: "claimed", claimed_at: new Date(), timeout: 100 }); @@ -116,9 +267,9 @@ describe("ExecutorManager", () => { const executorManager = new ExecutorManager(backend, config); let expectedPromise; - runMock.mockImplementationOnce(async (job: JobData, signal: EventEmitter) => { + runMock.mockImplementationOnce(async (job: JobData, signal: AbortSignal) => { const promise = new Promise((_, reject) => { - signal.on("abort", () => { + signal.addEventListener("abort", () => { reject(new Error("The task has been aborted")); }); }); @@ -132,7 +283,7 @@ describe("ExecutorManager", () => { expect(executorManager.availableSlotsGlobal()).toEqual(9); await execPromise; - expect(runMock).toBeCalledWith(jobData, expect.any(EventEmitter)); + expect(runMock).toBeCalledWith(jobData, expect.any(AbortSignal)); expect(runMock).toHaveReturnedWith(expectedPromise); await expect(expectedPromise).rejects.toThrow("The task has been aborted"); expect(executorManager.availableSlotsByQueue(queryConfig)).toEqual(1); @@ -162,7 +313,7 @@ describe("ExecutorManager", () => { expect(executorManager.availableSlotsGlobal()).toEqual(9); await execPromise; - expect(runMock).toBeCalledWith(jobData, expect.any(EventEmitter)); + expect(runMock).toBeCalledWith(jobData, expect.any(AbortSignal)); expect(executorManager.availableSlotsByQueue(queryConfig)).toEqual(1); expect(executorManager.availableSlotsGlobal()).toEqual(10); diff --git a/packages/engine/src/execution/executor-manager.ts b/packages/engine/src/execution/executor-manager.ts index f97b357d..1468ba36 100644 --- a/packages/engine/src/execution/executor-manager.ts +++ b/packages/engine/src/execution/executor-manager.ts @@ -1,10 +1,20 @@ import { Backend } from "@sidequest/backend"; -import { JobData, JobTransitionFactory, logger, QueueConfig, RetryTransition, RunTransition } from "@sidequest/core"; -import EventEmitter from "events"; +import { + CancelTransition, + JobCanceled, + JobData, + JobTimeout, + JobTransition, + JobTransitionFactory, + logger, + QueueConfig, + RetryTransition, + RunTransition, +} from "@sidequest/core"; import { inspect } from "util"; import { NonNullableEngineConfig } from "../engine"; import { JobTransitioner } from "../job/job-transitioner"; -import { RunnerPool } from "../shared-runner"; +import { InlineRunner, JobRunner, RunnerPool } from "../shared-runner"; /** * Manages job execution and worker concurrency for Sidequest. @@ -12,7 +22,7 @@ import { RunnerPool } from "../shared-runner"; export class ExecutorManager { private activeByQueue: Record>; private activeJobs: Set; - private runnerPool: RunnerPool; + private jobRunner: JobRunner; /** * Creates a new ExecutorManager. @@ -25,7 +35,10 @@ export class ExecutorManager { ) { this.activeByQueue = {}; this.activeJobs = new Set(); - this.runnerPool = new RunnerPool(this.nonNullConfig); + this.jobRunner = + this.nonNullConfig.runner === "inline" + ? new InlineRunner(this.nonNullConfig) + : new RunnerPool(this.nonNullConfig); } /** @@ -88,6 +101,8 @@ export class ExecutorManager { */ async execute(queueConfig: QueueConfig, job: JobData): Promise { let isRunning = false; + const controller = new AbortController(); + let timeoutHandle: ReturnType | undefined; try { logger("Executor Manager").debug(`Submitting job ${job.id} for execution in queue ${queueConfig.name}`); // We call prepareJob here again to make sure the jobs are in the queues. @@ -97,57 +112,91 @@ export class ExecutorManager { job = await JobTransitioner.apply(this.backend, job, new RunTransition()); isRunning = true; - const signal = new EventEmitter(); const cancellationCheck = async () => { while (isRunning) { const watchedJob = await this.backend.getJob(job.id); - if (watchedJob!.state === "canceled") { - logger("Executor Manager").debug(`Emitting abort signal for job ${job.id}`); - signal.emit("abort"); + if (watchedJob?.state === "canceled") { + logger("Executor Manager").debug(`Aborting job ${job.id}: canceled`); + controller.abort(new JobCanceled()); isRunning = false; return; } await new Promise((r) => setTimeout(r, 1000)); } }; - void cancellationCheck(); + void cancellationCheck().catch((error) => { + logger("Executor Manager").error(`Cancellation watcher for job ${job.id} failed:`, error); + }); logger("Executor Manager").debug(`Running job ${job.id} in queue ${queueConfig.name}`); - const runPromise = this.runnerPool.run(job, signal); + const runPromise = this.jobRunner.run(job, controller.signal); if (job.timeout) { - void new Promise(() => { - setTimeout(() => { - logger("Executor Manager").debug(`Job ${job.id} timed out after ${job.timeout}ms, aborting.`); - signal.emit("abort"); - void JobTransitioner.apply(this.backend, job, new RetryTransition(`Job timed out after ${job.timeout}ms`)); - }, job.timeout!); - }); + // Only signal the abort here. The terminal transition is decided when the run actually ends + // (resolve or reject) so a still-running job is never re-queued underneath itself. + timeoutHandle = setTimeout(() => { + logger("Executor Manager").debug(`Job ${job.id} timed out after ${job.timeout}ms, aborting.`); + controller.abort(new JobTimeout(job.timeout!)); + }, job.timeout); } const result = await runPromise; isRunning = false; - logger("Executor Manager").debug(`Job ${job.id} completed with result: ${inspect(result)}`); - const transition = JobTransitionFactory.create(result); - await JobTransitioner.apply(this.backend, job, transition); + // The job ran to a conclusion and returned a state (even if a timeout/cancel was signaled); + // respect it and transition accordingly. + logger("Executor Manager").debug(`Job ${job.id} settled with result: ${inspect(result)}`); + await this.applyTerminalTransition(job, JobTransitionFactory.create(result)); } catch (error: unknown) { isRunning = false; const err = error as Error; - if (err.message === "The task has been aborted") { - logger("Executor Manager").debug(`Job ${job.id} was aborted`); + if (controller.signal.aborted) { + // The run produced no result because the worker was hard-killed (thread). Only a clear + // cancellation maps to canceled; every other abort reason (timeout, or anything else) defaults + // to a retry as a failsafe. The rejection is logged so a real error during the abort is kept. + const reason: unknown = controller.signal.reason; + logger("Executor Manager").debug(`Job ${job.id} was hard-killed (${String(reason)}): ${err.message}`); + const transition = + reason instanceof JobCanceled + ? new CancelTransition() + : new RetryTransition(reason instanceof Error ? reason : new Error(`Job aborted: ${String(reason)}`)); + await this.applyTerminalTransition(job, transition); } else { logger("Executor Manager").error(`Unhandled error while executing job ${job.id}: ${err.message}`); - await JobTransitioner.apply(this.backend, job, new RetryTransition(err)); + await this.applyTerminalTransition(job, new RetryTransition(err)); } } finally { isRunning = false; + if (timeoutHandle) { + clearTimeout(timeoutHandle); + } this.activeByQueue[queueConfig.name].delete(job.id); this.activeJobs.delete(job.id); } } + /** + * Applies a job's final transition, tolerating the job row having disappeared. + * + * A job's row can be deleted while it runs (cleanup routine, an explicit delete, or a test + * truncating the table). Recording its terminal state is then impossible and safe to skip. This + * must never throw: `execute` is fire-and-forget, so an error here would surface as an unhandled + * rejection. + * + * @param job The job being finalized. + * @param transition The terminal transition to apply. + */ + private async applyTerminalTransition(job: JobData, transition: JobTransition): Promise { + try { + await JobTransitioner.apply(this.backend, job, transition); + } catch (error) { + logger("Executor Manager").warn( + `Could not record terminal state for job ${job.id} (it may no longer exist): ${error instanceof Error ? error.message : String(error)}`, + ); + } + } + /** * Destroys the runner pool and releases resources. */ @@ -155,13 +204,13 @@ export class ExecutorManager { await new Promise((resolve, reject) => { const checkJobs = () => { if (this.totalActiveWorkers() === 0) { - logger("ExecutorManager").info("All active jobs finished. Destroying runner pool."); + logger("ExecutorManager").info("All active jobs finished. Destroying runner."); try { - this.runnerPool.destroy(); - logger("ExecutorManager").debug("Runner pool destroyed. Returning."); + this.jobRunner.destroy(); + logger("ExecutorManager").debug("Runner destroyed. Returning."); resolve(); } catch (error) { - logger("ExecutorManager").error("Error while destroying runner pool:", error); + logger("ExecutorManager").error("Error while destroying runner:", error); reject(error as Error); } } else { diff --git a/packages/engine/src/shared-runner/index.ts b/packages/engine/src/shared-runner/index.ts index e2d80d65..2112f5b0 100644 --- a/packages/engine/src/shared-runner/index.ts +++ b/packages/engine/src/shared-runner/index.ts @@ -1,4 +1,6 @@ import * as runner from "./runner"; +export * from "./inline-runner"; +export * from "./job-runner"; export * from "./manual-loader"; export * from "./runner-pool"; export { runner as run }; diff --git a/packages/engine/src/shared-runner/inline-runner.test.ts b/packages/engine/src/shared-runner/inline-runner.test.ts new file mode 100644 index 00000000..74bc540f --- /dev/null +++ b/packages/engine/src/shared-runner/inline-runner.test.ts @@ -0,0 +1,52 @@ +import { sidequestTest, SidequestTestFixture } from "@/tests/fixture"; +import { JobData } from "@sidequest/core"; +import { beforeEach, describe, expect, vi } from "vitest"; +import { DummyJob } from "../test-jobs/dummy-job"; +import { InlineRunner } from "./inline-runner"; + +// Spy on the runner module's default export so we can assert how the InlineRunner delegates to it. +vi.mock("./runner", async (importOriginal) => { + const original = await importOriginal(); + return { default: vi.fn(original.default), injectSidequestConfig: original.injectSidequestConfig }; +}); + +import run from "./runner"; + +describe("InlineRunner", () => { + let runner: InlineRunner; + let jobData: JobData; + + beforeEach(async ({ backend, config }) => { + vi.clearAllMocks(); + + const job = new DummyJob(); + await job.ready(); + + jobData = await backend.createNewJob({ + class: job.className, + script: job.script, + args: [], + attempt: 0, + queue: "default", + constructor_args: [], + state: "waiting", + }); + + runner = new InlineRunner(config); + }); + + sidequestTest("runs the job in-process and returns its result", async () => { + const result = await runner.run(jobData); + expect(result).toEqual({ __is_job_transition__: true, type: "completed", result: "dummy job" }); + }); + + sidequestTest("delegates to the runner with inline enabled and forwards the signal", async ({ config }) => { + const signal = new AbortController().signal; + await runner.run(jobData, signal); + expect(run).toHaveBeenCalledWith({ jobData, config, inline: true, signal }); + }); + + sidequestTest("destroy is a no-op and does not throw", () => { + expect(() => runner.destroy()).not.toThrow(); + }); +}); diff --git a/packages/engine/src/shared-runner/inline-runner.ts b/packages/engine/src/shared-runner/inline-runner.ts new file mode 100644 index 00000000..f5ceb0fc --- /dev/null +++ b/packages/engine/src/shared-runner/inline-runner.ts @@ -0,0 +1,40 @@ +import { JobData, JobResult, logger } from "@sidequest/core"; +import { NonNullableEngineConfig } from "../engine"; +import { JobRunner } from "./job-runner"; +import run from "./runner"; + +/** + * Runs jobs in the current process/thread instead of a worker thread pool. + * + * Used by the inline execution mode (`runner: "inline"`). Unlike {@link RunnerPool}, a running job + * cannot be forcibly aborted: cancellation and timeouts are best-effort only, and a CPU-bound job + * will block the event loop. In exchange, jobs run in the host process and can reach live + * in-process state (the basis for framework integrations like NestJS). + */ +export class InlineRunner implements JobRunner { + /** + * Creates a new InlineRunner. + * @param nonNullConfig The non-nullable engine configuration. + */ + constructor(private nonNullConfig: NonNullableEngineConfig) {} + + /** + * Runs a job in the current process. The abort signal is forwarded to the job (as + * `this.abortSignal`) so it can stop cooperatively: inline execution has no separate thread to + * terminate, so this is the only way timeouts and cancellation can take effect. + * @param job The job data to run. + * @param signal Abort signal handed to the job for cooperative cancellation. + * @returns A promise resolving to the job result. + */ + run(job: JobData, signal?: AbortSignal): Promise { + logger("InlineRunner").debug(`Running job ${job.id} inline`); + return run({ jobData: job, config: this.nonNullConfig, inline: true, signal }); + } + + /** + * Releases resources. No-op for the inline runner. + */ + destroy(): void { + // There is no pool to tear down. In-flight jobs are awaited by the ExecutorManager. + } +} diff --git a/packages/engine/src/shared-runner/job-runner.ts b/packages/engine/src/shared-runner/job-runner.ts new file mode 100644 index 00000000..14d851f0 --- /dev/null +++ b/packages/engine/src/shared-runner/job-runner.ts @@ -0,0 +1,23 @@ +import { JobData, JobResult } from "@sidequest/core"; + +/** + * Abstraction over how a claimed job is actually executed. + * + * Implemented by the thread-based {@link RunnerPool} (piscina worker pool) and the + * {@link InlineRunner} (same-process execution). The {@link ExecutorManager} picks one based on + * the engine's `runner` configuration. + */ +export interface JobRunner { + /** + * Runs a job and resolves with its result. + * @param job The job data to run. + * @param signal Abort signal for the run. The thread runner uses it to terminate the worker; the + * inline runner forwards it to the job so it can stop cooperatively. + */ + run(job: JobData, signal?: AbortSignal): Promise; + + /** + * Releases any resources held by the runner. + */ + destroy(): void; +} diff --git a/packages/engine/src/shared-runner/runner-pool.test.ts b/packages/engine/src/shared-runner/runner-pool.test.ts index 8c4bba95..5059c9b4 100644 --- a/packages/engine/src/shared-runner/runner-pool.test.ts +++ b/packages/engine/src/shared-runner/runner-pool.test.ts @@ -1,6 +1,6 @@ import { sidequestTest, SidequestTestFixture } from "@/tests/fixture"; -import { JobData } from "@sidequest/core"; -import EventEmitter from "events"; +import { AbortReasonMessage, JobData, JobTimeout } from "@sidequest/core"; +import { MessagePort } from "node:worker_threads"; import { beforeEach, describe, expect, vi } from "vitest"; import { DummyJob } from "../test-jobs/dummy-job"; import { RunnerPool } from "./runner-pool"; @@ -39,14 +39,65 @@ describe("RunnerPool", () => { pool = new RunnerPool(config); }); - sidequestTest("should call pool.run with job data", async ({ config }) => { - const emiter = new EventEmitter(); - const result = await pool.run(jobData, emiter); + sidequestTest("passes the abort signal straight to piscina when grace is 0", async ({ config }) => { + const signal = new AbortController().signal; + const result = await pool.run(jobData, signal); expect(result).toEqual({ type: "completed", result: "ok" }); - expect(piscinaMockInstance.run).toHaveBeenCalledWith({ jobData, config }, { signal: emiter }); + expect(piscinaMockInstance.run).toHaveBeenCalledWith({ jobData, config }, { signal }); }); + sidequestTest("rejects without running the job when the signal is already aborted", async () => { + const controller = new AbortController(); + controller.abort(new Error("already gone")); + + await expect(pool.run(jobData, controller.signal)).rejects.toThrow("already gone"); + expect(piscinaMockInstance.run).not.toHaveBeenCalled(); + }); + + sidequestTest( + "with a grace period, delivers the abort over a port and hard-kills after the grace", + async ({ config }) => { + vi.useFakeTimers(); + try { + const gracePool = new RunnerPool({ ...config, abortGracePeriodMs: 1000 }); + + let capturedPort: MessagePort | undefined; + let hardKillSignal: AbortSignal | undefined; + piscinaMockInstance.run.mockImplementationOnce( + (value: { abortPort: MessagePort }, opts: { signal: AbortSignal }) => { + capturedPort = value.abortPort; + hardKillSignal = opts.signal; + // Resolve only once piscina is asked to terminate (hard kill). + return new Promise((resolve) => + opts.signal.addEventListener("abort", () => resolve({ type: "completed", result: "killed" })), + ); + }, + ); + + const controller = new AbortController(); + const runPromise = gracePool.run(jobData, controller.signal); + + const message = new Promise((resolve) => + capturedPort!.once("message", (m: AbortReasonMessage) => resolve(m)), + ); + + controller.abort(new JobTimeout(5000)); + + expect(await message).toEqual({ kind: "timeout", timeoutMs: 5000 }); + expect(hardKillSignal!.aborted).toBe(false); + + // Grace elapses -> piscina is asked to terminate the worker. + await vi.advanceTimersByTimeAsync(1000); + expect(hardKillSignal!.aborted).toBe(true); + + await runPromise; + } finally { + vi.useRealTimers(); + } + }, + ); + sidequestTest("should call pool.destroy", () => { pool.destroy(); expect(piscinaMockInstance.destroy).toHaveBeenCalled(); diff --git a/packages/engine/src/shared-runner/runner-pool.ts b/packages/engine/src/shared-runner/runner-pool.ts index ab17d3fe..1ed8b20b 100644 --- a/packages/engine/src/shared-runner/runner-pool.ts +++ b/packages/engine/src/shared-runner/runner-pool.ts @@ -1,13 +1,14 @@ -import { JobData, JobResult, logger } from "@sidequest/core"; -import EventEmitter from "events"; +import { JobData, JobResult, logger, serializeAbortReason } from "@sidequest/core"; +import { MessageChannel } from "node:worker_threads"; import Piscina from "piscina"; import { DEFAULT_RUNNER_PATH } from "../constants"; import { NonNullableEngineConfig } from "../engine"; +import { JobRunner } from "./job-runner"; /** * A pool of worker threads for running jobs in parallel using Piscina. */ -export class RunnerPool { +export class RunnerPool implements JobRunner { /** The underlying Piscina worker pool. */ private readonly pool: Piscina; @@ -29,13 +30,53 @@ export class RunnerPool { /** * Runs a job in the worker pool. + * + * With `abortGracePeriodMs === 0` (default), an abort terminates the worker immediately. With a + * positive grace period, the abort is delivered to the job cooperatively over a transferred port + * (so it can stop via `this.abortSignal`), and the worker is only forcibly terminated if it has + * not finished within the grace period. + * * @param job The job data to run. - * @param signal Optional event emitter for cancellation. + * @param signal Optional abort signal for cancellation/timeout. * @returns A promise resolving to the job result. */ - run(job: JobData, signal?: EventEmitter): Promise { + run(job: JobData, signal?: AbortSignal): Promise { logger("RunnerPool").debug(`Running job ${job.id} in pool`); - return this.pool.run({ jobData: job, config: this.nonNullConfig }, { signal }); + + // Already aborted before we could start (e.g. canceled between claim and dispatch): don't run it. + if (signal?.aborted) { + return Promise.reject(signal.reason instanceof Error ? signal.reason : new Error("Job aborted before execution")); + } + + const grace = this.nonNullConfig.abortGracePeriodMs; + + if (!signal || grace <= 0) { + // Abort terminates the worker immediately. + return this.pool.run({ jobData: job, config: this.nonNullConfig }, { signal }); + } + + // Deliver the abort cooperatively first, then hard-terminate after the grace period. + const channel = new MessageChannel(); + const hardKill = new AbortController(); + let graceTimer: ReturnType | undefined; + + const onAbort = () => { + channel.port1.postMessage(serializeAbortReason(signal.reason)); + graceTimer = setTimeout(() => hardKill.abort(), grace); + }; + + signal.addEventListener("abort", onAbort, { once: true }); + + return this.pool + .run( + { jobData: job, config: this.nonNullConfig, abortPort: channel.port2 }, + { transferList: [channel.port2], signal: hardKill.signal }, + ) + .finally(() => { + if (graceTimer) clearTimeout(graceTimer); + signal.removeEventListener("abort", onAbort); + channel.port1.close(); + }); } /** diff --git a/packages/engine/src/shared-runner/runner.test.ts b/packages/engine/src/shared-runner/runner.test.ts index 555cd270..38378c8f 100644 --- a/packages/engine/src/shared-runner/runner.test.ts +++ b/packages/engine/src/shared-runner/runner.test.ts @@ -1,9 +1,11 @@ import { sidequestTest, SidequestTestFixture } from "@/tests/fixture"; -import { FailedResult, JobData } from "@sidequest/core"; +import { FailedResult, JobCanceled, JobData, JobTimeout } from "@sidequest/core"; import { existsSync, unlinkSync } from "node:fs"; import { unlink, writeFile } from "node:fs/promises"; import { join, resolve } from "node:path"; +import { MessageChannel } from "node:worker_threads"; import { vi } from "vitest"; +import { AbortAwareJob } from "../test-jobs/abort-aware-job"; import { DummyJob } from "../test-jobs/dummy-job"; import { DummyJobWithArgs } from "../test-jobs/dummy-job-with-args"; import { importSidequest } from "../utils/import"; @@ -52,6 +54,108 @@ describe("runner.ts", () => { expect(result).toEqual({ __is_job_transition__: true, type: "completed", result: "dummy job" }); }); + sidequestTest("injects the config when not inline", async ({ config }) => { + vi.mocked(importSidequest).mockClear(); + await run({ jobData, config }); + expect(importSidequest).toHaveBeenCalled(); + }); + + sidequestTest("skips config injection when inline", async ({ config }) => { + vi.mocked(importSidequest).mockClear(); + const result = await run({ jobData, config, inline: true }); + expect(result).toEqual({ __is_job_transition__: true, type: "completed", result: "dummy job" }); + expect(importSidequest).not.toHaveBeenCalled(); + }); + + sidequestTest("injects the abort signal into the job when provided", async ({ config }) => { + const injectSpy = vi.spyOn(DummyJob.prototype, "injectAbortSignal"); + const signal = new AbortController().signal; + await run({ jobData, config, signal }); + expect(injectSpy).toHaveBeenCalledWith(signal); + injectSpy.mockRestore(); + }); + + sidequestTest("does not inject an abort signal when none is provided", async ({ config }) => { + const injectSpy = vi.spyOn(DummyJob.prototype, "injectAbortSignal"); + await run({ jobData, config }); + expect(injectSpy).not.toHaveBeenCalled(); + injectSpy.mockRestore(); + }); + + sidequestTest("a job receives the abort signal and its reason and stops", async ({ backend, config }) => { + const job = new AbortAwareJob(); + await job.ready(); + const abortJobData = await backend.createNewJob({ + class: job.className, + script: job.script, + args: [], + attempt: 0, + queue: "default", + constructor_args: [], + state: "waiting", + }); + + const controller = new AbortController(); + const resultPromise = run({ jobData: abortJobData, config, inline: true, signal: controller.signal }); + controller.abort(new JobCanceled()); + + // The job resolves once it observes the abort (whether before it starts or while awaiting it). + const result = (await resultPromise) as FailedResult; + expect(result.type).toBe("failed"); + expect(result.error.message).toContain("JobCanceled"); + }); + + sidequestTest("a job sees an already-aborted signal before it starts", async ({ backend, config }) => { + const job = new AbortAwareJob(); + await job.ready(); + const abortJobData = await backend.createNewJob({ + class: job.className, + script: job.script, + args: [], + attempt: 0, + queue: "default", + constructor_args: [], + state: "waiting", + }); + + const controller = new AbortController(); + controller.abort(new JobTimeout(50)); + + const result = (await run({ + jobData: abortJobData, + config, + inline: true, + signal: controller.signal, + })) as FailedResult; + expect(result.type).toBe("failed"); + expect(result.error.message).toContain("aborted before start: JobTimeout"); + }); + + sidequestTest("a thread job is aborted via the abort port (rebuilds the reason)", async ({ backend, config }) => { + const job = new AbortAwareJob(); + await job.ready(); + const abortJobData = await backend.createNewJob({ + class: job.className, + script: job.script, + args: [], + attempt: 0, + queue: "default", + constructor_args: [], + state: "waiting", + }); + + const channel = new MessageChannel(); + // Thread path: no live signal, the abort arrives over the transferred port. + const resultPromise = run({ jobData: abortJobData, config, abortPort: channel.port2 }); + channel.port1.postMessage({ kind: "timeout", timeoutMs: 1234 }); + + const result = (await resultPromise) as FailedResult; + expect(result.type).toBe("failed"); + // The worker reconstructs the JobTimeout reason from the port message. + expect(result.error.message).toContain("JobTimeout"); + channel.port1.close(); + }); + sidequestTest("fails with invalid script", async ({ config }) => { jobData.script = "invalid!"; const result = (await run({ jobData, config })) as FailedResult; diff --git a/packages/engine/src/shared-runner/runner.ts b/packages/engine/src/shared-runner/runner.ts index 8e31abde..65219224 100644 --- a/packages/engine/src/shared-runner/runner.ts +++ b/packages/engine/src/shared-runner/runner.ts @@ -1,18 +1,66 @@ -import { Job, JobClassType, JobData, JobResult, logger, resolveScriptPathForJob, toErrorData } from "@sidequest/core"; +import { + AbortReasonMessage, + deserializeAbortReason, + Job, + JobClassType, + JobData, + JobResult, + logger, + resolveScriptPathForJob, + toErrorData, +} from "@sidequest/core"; import { existsSync } from "node:fs"; import { fileURLToPath } from "node:url"; +import { MessagePort } from "node:worker_threads"; import { EngineConfig } from "../engine"; import { importSidequest } from "../utils"; import { findSidequestJobsScriptInParentDirs, MANUAL_SCRIPT_TAG, resolveScriptPath } from "./manual-loader"; +/** + * Builds an {@link AbortSignal} for a worker-thread job from an abort port. + * + * The thread runner cannot receive a live `AbortSignal` across the worker boundary, so the engine + * transfers a {@link MessagePort} and posts an abort message on it; this turns that message into a + * local signal carrying the proper `JobTimeout`/`JobCanceled` reason. + */ +function signalFromAbortPort(port: MessagePort): AbortSignal { + const controller = new AbortController(); + port.on("message", (message: AbortReasonMessage) => { + controller.abort(deserializeAbortReason(message)); + }); + return controller.signal; +} + /** * Runs a job by dynamically importing its script and executing the specified class. - * @param jobData The job data containing script and class information + * @param jobData The job data containing script and class information. * @param config The non-nullable engine configuration. + * @param inline Whether the job runs inline in the host process. When true, the Sidequest config is + * not re-injected (the host process is already configured). + * @param signal Abort signal handed to the job as `this.abortSignal` (used by the inline runner, + * which executes in the same process). + * @param abortPort Port the thread runner uses to receive the abort cooperatively across the worker + * boundary; it is turned into the job's `this.abortSignal`. Mutually exclusive with `signal`. * @returns A promise resolving to the job result. */ -export default async function run({ jobData, config }: { jobData: JobData; config: EngineConfig }): Promise { - await injectSidequestConfig(config); +export default async function run({ + jobData, + config, + inline, + signal, + abortPort, +}: { + jobData: JobData; + config: EngineConfig; + inline?: boolean; + signal?: AbortSignal; + abortPort?: MessagePort; +}): Promise { + // In inline mode the job runs in the host process, where Sidequest is already configured, so + // re-injecting the config is redundant. In a worker thread the module is fresh and needs it. + if (!inline) { + await injectSidequestConfig(config); + } let script: Record = {}; try { @@ -65,9 +113,24 @@ export default async function run({ jobData, config }: { jobData: JobData; confi const job: Job = new JobClass(...jobData.constructor_args); job.injectJobData(jobData); + // Exactly one of these is provided: inline passes a live signal; the thread runner passes an abort + // port it turns into one. + let abortSignal: AbortSignal | undefined; + if (signal) { + abortSignal = signal; + } else if (abortPort) { + abortSignal = signalFromAbortPort(abortPort); + } + if (abortSignal) { + job.injectAbortSignal(abortSignal); + } logger("Runner").debug(`Executing job class "${jobData.class}" with args:`, jobData.args); - return job.perform(...jobData.args); + try { + return await job.perform(...jobData.args); + } finally { + abortPort?.close(); + } } /** diff --git a/packages/engine/src/test-jobs/abort-aware-job.js b/packages/engine/src/test-jobs/abort-aware-job.js new file mode 100644 index 00000000..bc9704de --- /dev/null +++ b/packages/engine/src/test-jobs/abort-aware-job.js @@ -0,0 +1,19 @@ +import { Job } from "@sidequest/core"; + +/** + * A job that honors `this.abortSignal`: it waits until aborted and reports the abort reason. + * Used to verify cooperative cancellation/timeout end-to-end through the runner. + */ +export class AbortAwareJob extends Job { + async run() { + if (this.abortSignal.aborted) { + return this.fail(`aborted before start: ${this.abortSignal.reason?.name}`); + } + + await new Promise((resolve) => { + this.abortSignal.addEventListener("abort", resolve, { once: true }); + }); + + return this.fail(`aborted: ${this.abortSignal.reason?.name}`); + } +} diff --git a/packages/engine/src/workers/index.ts b/packages/engine/src/workers/index.ts index d66d2cdb..9a327bf0 100644 --- a/packages/engine/src/workers/index.ts +++ b/packages/engine/src/workers/index.ts @@ -1 +1,2 @@ export * from "./main"; +export * from "./worker-runtime"; diff --git a/packages/engine/src/workers/main.test.ts b/packages/engine/src/workers/main.test.ts index a5dedb5d..b193e525 100644 --- a/packages/engine/src/workers/main.test.ts +++ b/packages/engine/src/workers/main.test.ts @@ -4,8 +4,6 @@ import { randomUUID } from "node:crypto"; import { beforeEach, describe, expect, vi } from "vitest"; import { Dispatcher } from "../execution/dispatcher"; import { grantQueueConfig } from "../queue"; -import { cleanupFinishedJobs } from "../routines/cleanup-finished-job"; -import { releaseStaleJobs } from "../routines/release-stale-jobs"; import { MainWorker } from "./main"; const runMock = vi.hoisted(() => vi.fn()); @@ -20,7 +18,7 @@ vi.mock("../shared-runner", () => ({ })); const cronMocks = vi.hoisted(() => ({ - schedule: vi.fn().mockReturnValue({ execute: vi.fn() }), + schedule: vi.fn().mockReturnValue({ execute: vi.fn(), stop: vi.fn() }), })); vi.mock("node-cron", () => ({ @@ -60,53 +58,15 @@ describe("main.ts", () => { } await worker.runWorker(config); vi.resetAllMocks(); + // resetAllMocks clears the return value, so re-establish the scheduled-task shape used by + // WorkerRuntime (which calls task.stop() on shutdown). + cronMocks.schedule.mockReturnValue({ execute: vi.fn(), stop: vi.fn() }); }); afterEach(async () => { await worker.shutdown(); }); - describe("startCron", () => { - sidequestTest("should schedule both cron jobs", async () => { - await worker.startCron(60, 600_000, 60_000, 60, 0); - - expect(cronMocks.schedule).toHaveBeenCalledTimes(2); - expect(cronMocks.schedule).toHaveBeenCalledWith("*/60 * * * *", expect.any(Function)); - }); - - sidequestTest("should call releaseStaleJobs when release cron executes", async () => { - await worker.startCron(60, 600_000, 60_000, 60, 0); - - const cronCallback = cronMocks.schedule.mock.calls[0][1] as () => unknown; - - await cronCallback(); - - expect(releaseStaleJobs).toHaveBeenCalledWith(expect.any(Object), 600_000, 60_000); - }); - - sidequestTest("should call cleanupFinishedJobs when cleanup cron executes", async () => { - await worker.startCron(60, 600_000, 60_000, 60, 0); - - const cronCallback = cronMocks.schedule.mock.calls[1][1] as () => unknown; - - await cronCallback(); - - expect(cleanupFinishedJobs).toHaveBeenCalledWith(expect.any(Object), 0); - }); - - sidequestTest("should handle errors and log them when releaseStaleJobs fails", async () => { - const error = new Error("fail"); - (releaseStaleJobs as unknown as ReturnType).mockRejectedValueOnce(error); - - await worker.startCron(60, 600_000, 60_000, 60, 0); - - const cronCallback = cronMocks.schedule.mock.calls[0][1] as () => unknown; - - await expect(cronCallback()).resolves.toBeUndefined(); - expect(releaseStaleJobs).toHaveBeenCalled(); - }); - }); - describe("runWorker", () => { sidequestTest("should call startCron after starting the worker", async ({ config }) => { const mockWorkerRun = vi.fn().mockResolvedValueOnce(undefined); diff --git a/packages/engine/src/workers/main.ts b/packages/engine/src/workers/main.ts index 85353b63..c0ea6347 100644 --- a/packages/engine/src/workers/main.ts +++ b/packages/engine/src/workers/main.ts @@ -1,20 +1,13 @@ -import { Backend } from "@sidequest/backend"; import { logger } from "@sidequest/core"; -import cron from "node-cron"; import { WORKER_PROCESS_FLAG } from "../constants"; import { Engine, EngineConfig, NonNullableEngineConfig } from "../engine"; -import { Dispatcher } from "../execution/dispatcher"; -import { ExecutorManager } from "../execution/executor-manager"; -import { QueueManager } from "../execution/queue-manager"; -import { cleanupFinishedJobs } from "../routines/cleanup-finished-job"; -import { releaseStaleJobs } from "../routines/release-stale-jobs"; import { gracefulShutdown } from "../utils/shutdown"; +import { WorkerRuntime } from "./worker-runtime"; export class MainWorker { shuttingDown = false; - private dispatcher: Dispatcher | undefined; + private runtime?: WorkerRuntime; private engine = new Engine(); - private backend?: Backend; /** * Starts a Sidequest worker process with the given configuration. @@ -24,23 +17,8 @@ export class MainWorker { if (!this.shuttingDown) { try { const nonNullConfig = await this.engine.configure({ ...sidequestConfig, skipMigration: true }); - this.backend = this.engine.getBackend()!; - - this.dispatcher = new Dispatcher( - this.backend, - new QueueManager(this.backend, nonNullConfig.queues, nonNullConfig.queueDefaults), - new ExecutorManager(this.backend, nonNullConfig), - nonNullConfig.jobPollingInterval, - ); - this.dispatcher.start(); - - await this.startCron( - nonNullConfig.releaseStaleJobsIntervalMin, - nonNullConfig.releaseStaleJobsMaxStaleMs, - nonNullConfig.releaseStaleJobsMaxClaimedMs, - nonNullConfig.cleanupFinishedJobsIntervalMin, - nonNullConfig.cleanupFinishedJobsOlderThan, - ); + this.runtime = new WorkerRuntime(this.engine.getBackend()!, nonNullConfig); + await this.runtime.start(); } catch (error) { logger("Worker").error(error); process.exit(1); @@ -56,91 +34,13 @@ export class MainWorker { async shutdown() { if (!this.shuttingDown) { this.shuttingDown = true; - logger("Worker").debug("Shutting down dispatcher"); - await this.dispatcher?.stop(); + logger("Worker").debug("Shutting down worker runtime"); + await this.runtime?.shutdown(); logger("Worker").debug("Shutting down engine"); await this.engine.close(); logger("Worker").debug("Main worker completely shut down"); } } - - /** - * Starts cron job for releasing stale jobs. - * Also executes the task immediately. - */ - async startAndExecuteStaleJobsReleaseCron( - intervalMin: number, - maxStaleMs: number, - maxClaimedMs: number, - ): Promise { - if (!this.backend) { - throw new Error("Backend is not initialized. Cannot start stale jobs release cron."); - } - - logger("Worker").debug(`Starting stale jobs release cron with interval: ${intervalMin} minutes`); - const releaseTask = cron.schedule(`*/${intervalMin} * * * *`, async () => { - try { - logger("Worker").debug("Running stale jobs release task"); - await releaseStaleJobs(this.backend!, maxStaleMs, maxClaimedMs); - } catch (error: unknown) { - logger("Worker").error("Error on running ReleaseStaleJob!", error); - } - }); - return releaseTask.execute(); - } - - /** - * Starts cron job for cleaning up finished jobs. - * Also executes the task immediately. - */ - async startAndExecuteFinishedJobsCleanupCron(intervalMin: number, cutoffMs: number): Promise { - if (!this.backend) { - throw new Error("Backend is not initialized. Cannot start finished jobs cleanup cron."); - } - - logger("Worker").debug(`Starting finished jobs cleanup cron with interval: ${intervalMin} minutes`); - const cleanupTask = cron.schedule(`*/${intervalMin} * * * *`, async () => { - try { - logger("Worker").debug("Running finished jobs cleanup task"); - await cleanupFinishedJobs(this.backend!, cutoffMs); - } catch (error: unknown) { - logger("Worker").error("Error on running CleanupJob!", error); - } - }); - return cleanupTask.execute(); - } - - /** - * Starts cron jobs for releasing stale jobs and cleaning up finished jobs. - * - * @param staleIntervalMin Interval in minutes for releasing stale jobs, or false to disable. - * @param maxStaleMs Maximum age in milliseconds for stale jobs. - * @param maxClaimedMs Maximum age in milliseconds for claimed jobs. - * @param cleanupIntervalMin Interval in minutes for cleaning up finished jobs, or false to disable - * @param cleanupCutoffMs Maximum age in milliseconds for finished jobs to be cleaned up. - */ - async startCron( - staleIntervalMin: number | false, - maxStaleMs: number, - maxClaimedMs: number, - cleanupIntervalMin: number | false, - cleanupCutoffMs: number, - ) { - logger("Worker").debug("Starting cron jobs"); - const promises: Promise[] = []; - - if (staleIntervalMin !== false) { - promises.push(this.startAndExecuteStaleJobsReleaseCron(staleIntervalMin, maxStaleMs, maxClaimedMs)); - } - - if (cleanupIntervalMin !== false) { - promises.push(this.startAndExecuteFinishedJobsCleanupCron(cleanupIntervalMin, cleanupCutoffMs)); - } - - await Promise.all(promises).catch((error) => { - logger("Worker").error(error); - }); - } } // Gate the bootstrap on the explicit flag the engine passes when forking, not on `!!process.send`. diff --git a/packages/engine/src/workers/worker-runtime.test.ts b/packages/engine/src/workers/worker-runtime.test.ts new file mode 100644 index 00000000..15765b0c --- /dev/null +++ b/packages/engine/src/workers/worker-runtime.test.ts @@ -0,0 +1,119 @@ +import { sidequestTest, SidequestTestFixture } from "@/tests/fixture"; +import { beforeEach, describe, expect, vi } from "vitest"; +import { Dispatcher } from "../execution/dispatcher"; +import { cleanupFinishedJobs } from "../routines/cleanup-finished-job"; +import { releaseStaleJobs } from "../routines/release-stale-jobs"; +import { WorkerRuntime } from "./worker-runtime"; + +const runMock = vi.hoisted(() => vi.fn()); + +vi.mock("../shared-runner", () => ({ + RunnerPool: vi.fn(function () { + return { + run: runMock, + destroy: vi.fn(), + }; + }), +})); + +const cronMocks = vi.hoisted(() => ({ + schedule: vi.fn(), +})); + +vi.mock("node-cron", () => ({ + default: { + schedule: cronMocks.schedule, + }, +})); + +vi.mock("../routines/cleanup-finished-job", () => ({ + cleanupFinishedJobs: vi.fn(() => undefined), +})); + +vi.mock("../routines/release-stale-jobs", () => ({ + releaseStaleJobs: vi.fn(() => undefined), +})); + +describe("WorkerRuntime", () => { + let runtime: WorkerRuntime; + + beforeEach(async ({ backend, config }) => { + await backend.migrate(); + vi.clearAllMocks(); + cronMocks.schedule.mockReturnValue({ execute: vi.fn(), stop: vi.fn() }); + runtime = new WorkerRuntime(backend, config); + }); + + describe("start", () => { + sidequestTest("starts the dispatcher and schedules both cron routines", async () => { + const dispatcherStart = vi.spyOn(Dispatcher.prototype, "start").mockImplementation(() => undefined); + + await runtime.start(); + + expect(dispatcherStart).toHaveBeenCalled(); + expect(cronMocks.schedule).toHaveBeenCalledTimes(2); + expect(cronMocks.schedule).toHaveBeenCalledWith("*/60 * * * *", expect.any(Function)); + }); + }); + + describe("startCron", () => { + sidequestTest("runs releaseStaleJobs when the stale cron executes", async ({ config }) => { + await runtime.startCron(); + + const cronCallback = cronMocks.schedule.mock.calls[0][1] as () => unknown; + await cronCallback(); + + expect(releaseStaleJobs).toHaveBeenCalledWith( + expect.any(Object), + config.releaseStaleJobsMaxStaleMs, + config.releaseStaleJobsMaxClaimedMs, + ); + }); + + sidequestTest("runs cleanupFinishedJobs when the cleanup cron executes", async ({ config }) => { + await runtime.startCron(); + + const cronCallback = cronMocks.schedule.mock.calls[1][1] as () => unknown; + await cronCallback(); + + expect(cleanupFinishedJobs).toHaveBeenCalledWith(expect.any(Object), config.cleanupFinishedJobsOlderThan); + }); + + sidequestTest("does not schedule routines that are disabled", async ({ backend, config }) => { + const disabled = new WorkerRuntime(backend, { + ...config, + releaseStaleJobsIntervalMin: false, + cleanupFinishedJobsIntervalMin: false, + }); + + await disabled.startCron(); + + expect(cronMocks.schedule).not.toHaveBeenCalled(); + }); + + sidequestTest("swallows and logs errors thrown by a routine", async () => { + (releaseStaleJobs as unknown as ReturnType).mockRejectedValueOnce(new Error("fail")); + + await runtime.startCron(); + + const cronCallback = cronMocks.schedule.mock.calls[0][1] as () => unknown; + await expect(cronCallback()).resolves.toBeUndefined(); + expect(releaseStaleJobs).toHaveBeenCalled(); + }); + }); + + describe("shutdown", () => { + sidequestTest("stops the scheduled cron tasks and drains the dispatcher", async () => { + const stop = vi.fn(); + cronMocks.schedule.mockReturnValue({ execute: vi.fn(), stop }); + vi.spyOn(Dispatcher.prototype, "start").mockImplementation(() => undefined); + const dispatcherStop = vi.spyOn(Dispatcher.prototype, "stop").mockResolvedValue(undefined); + + await runtime.start(); + await runtime.shutdown(); + + expect(stop).toHaveBeenCalledTimes(2); + expect(dispatcherStop).toHaveBeenCalled(); + }); + }); +}); diff --git a/packages/engine/src/workers/worker-runtime.ts b/packages/engine/src/workers/worker-runtime.ts new file mode 100644 index 00000000..3a061019 --- /dev/null +++ b/packages/engine/src/workers/worker-runtime.ts @@ -0,0 +1,108 @@ +import { Backend } from "@sidequest/backend"; +import { logger } from "@sidequest/core"; +import cron, { ScheduledTask } from "node-cron"; +import { NonNullableEngineConfig } from "../engine"; +import { Dispatcher } from "../execution/dispatcher"; +import { ExecutorManager } from "../execution/executor-manager"; +import { QueueManager } from "../execution/queue-manager"; +import { cleanupFinishedJobs } from "../routines/cleanup-finished-job"; +import { releaseStaleJobs } from "../routines/release-stale-jobs"; + +/** + * Owns the runtime side of the engine: the dispatcher loop plus the stale-job and finished-job + * cron routines. + * + * It runs either inside the forked worker process (driven by {@link MainWorker}) or directly in the + * host process when the engine is started with `fork: false`. It does NOT own the backend + * lifecycle: closing the backend remains the responsibility of the {@link Engine} that created it. + */ +export class WorkerRuntime { + private dispatcher: Dispatcher; + private cronTasks: ScheduledTask[] = []; + + /** + * Creates a new WorkerRuntime. + * @param backend The backend instance. + * @param config The non-nullable engine configuration. + */ + constructor( + private backend: Backend, + private config: NonNullableEngineConfig, + ) { + this.dispatcher = new Dispatcher( + backend, + new QueueManager(backend, config.queues, config.queueDefaults), + new ExecutorManager(backend, config), + config.jobPollingInterval, + ); + } + + /** + * Starts the dispatcher loop and the cron routines. + */ + async start(): Promise { + this.dispatcher.start(); + await this.startCron(); + } + + /** + * Stops the cron routines and drains the dispatcher. Unlike the forked worker (which relies on + * process exit), the in-process runtime must explicitly stop its cron tasks to avoid leaks. + */ + async shutdown(): Promise { + logger("WorkerRuntime").debug("Stopping cron routines"); + // ScheduledTask.stop() returns `void | Promise`; normalize before aggregating. + await Promise.all(this.cronTasks.map((task) => Promise.resolve(task.stop()))); + this.cronTasks = []; + logger("WorkerRuntime").debug("Stopping dispatcher"); + await this.dispatcher.stop(); + } + + /** + * Schedules the stale-job and finished-job cron routines according to the config and runs each + * once immediately. Either routine can be disabled with a `false` interval. + */ + async startCron(): Promise { + const promises: Promise[] = []; + + if (this.config.releaseStaleJobsIntervalMin !== false) { + promises.push( + this.scheduleAndRun(this.config.releaseStaleJobsIntervalMin, "ReleaseStaleJob", () => + releaseStaleJobs( + this.backend, + this.config.releaseStaleJobsMaxStaleMs, + this.config.releaseStaleJobsMaxClaimedMs, + ), + ), + ); + } + + if (this.config.cleanupFinishedJobsIntervalMin !== false) { + promises.push( + this.scheduleAndRun(this.config.cleanupFinishedJobsIntervalMin, "CleanupJob", () => + cleanupFinishedJobs(this.backend, this.config.cleanupFinishedJobsOlderThan), + ), + ); + } + + await Promise.all(promises).catch((error) => logger("WorkerRuntime").error(error)); + } + + /** + * Schedules a recurring task at the given minute interval, tracks it for shutdown, and triggers + * an immediate run. + */ + private scheduleAndRun(intervalMin: number, name: string, task: () => Promise): Promise { + logger("WorkerRuntime").debug(`Starting ${name} cron with interval: ${intervalMin} minutes`); + const scheduled = cron.schedule(`*/${intervalMin} * * * *`, async () => { + try { + logger("WorkerRuntime").debug(`Running ${name} task`); + await task(); + } catch (error: unknown) { + logger("WorkerRuntime").error(`Error on running ${name}!`, error); + } + }); + this.cronTasks.push(scheduled); + return scheduled.execute(); + } +} diff --git a/tests/integration/shared-test-suite.js b/tests/integration/shared-test-suite.js index 125b6ae6..531d98c5 100644 --- a/tests/integration/shared-test-suite.js +++ b/tests/integration/shared-test-suite.js @@ -310,7 +310,9 @@ export function createIntegrationTestSuite(Sidequest, jobs, moduleType = "ESM") queues: [{ name: "default" }], }); - const jobData = await Sidequest.build(TimeoutJob).enqueue(1000000); + // A few seconds: long enough to reliably cancel while running, short enough that it cannot + // outlive the suite's teardown (which truncates the table) if the cancel watcher is mid-poll. + const jobData = await Sidequest.build(TimeoutJob).enqueue(3000); await vi.waitUntil(() => Sidequest.job.get(jobData.id).then((job) => job?.state === "running"), 5000); // Cancel the job while it's running