diff --git a/CLAUDE.md b/CLAUDE.md
index 57e89cb9..771beb38 100644
--- a/CLAUDE.md
+++ b/CLAUDE.md
@@ -116,6 +116,7 @@ These are the things that aren't visually loud in either code or docs:
- **Don't wrap things "just in case."** No backwards-compat shims, no feature flags for hypothetical use cases, no validation at internal boundaries.
- **Match existing structure.** New engine concerns go under `packages/engine/src/`; cross-cutting types belong in `@sidequest/core`. Don't create a new package for a small piece.
- **Tests live next to the code.** `foo.ts` + `foo.test.ts` in the same folder; integration tests under `tests/integration/`. Backend changes must keep `@sidequest/backend-test` green for every driver.
+- **JSDoc every exported entity.** `CONTRIBUTING.md` requires JSDoc-style docstrings on all exports; match that when adding public API.
- **Commits follow Conventional Commits** (commitlint + Husky enforce it). semantic-release publishes from `master`.
## Scope guard
diff --git a/packages/core/src/job/abort-reason.test.ts b/packages/core/src/job/abort-reason.test.ts
new file mode 100644
index 00000000..1aecbf4a
--- /dev/null
+++ b/packages/core/src/job/abort-reason.test.ts
@@ -0,0 +1,24 @@
+import { describe, expect, it } from "vitest";
+import { deserializeAbortReason, JobCanceled, JobTimeout, serializeAbortReason } from "./abort-reason";
+
+describe("abort-reason", () => {
+ it("round-trips a JobTimeout reason through the wire form", () => {
+ const message = serializeAbortReason(new JobTimeout(1500));
+ expect(message).toEqual({ kind: "timeout", timeoutMs: 1500 });
+
+ const reason = deserializeAbortReason(message);
+ expect(reason).toBeInstanceOf(JobTimeout);
+ expect((reason as JobTimeout).timeoutMs).toBe(1500);
+ });
+
+ it("round-trips a JobCanceled reason through the wire form", () => {
+ const message = serializeAbortReason(new JobCanceled());
+ expect(message).toEqual({ kind: "canceled" });
+ expect(deserializeAbortReason(message)).toBeInstanceOf(JobCanceled);
+ });
+
+ it("treats any non-timeout reason as canceled", () => {
+ expect(serializeAbortReason(new Error("boom"))).toEqual({ kind: "canceled" });
+ expect(serializeAbortReason(undefined)).toEqual({ kind: "canceled" });
+ });
+});
diff --git a/packages/core/src/job/abort-reason.ts b/packages/core/src/job/abort-reason.ts
new file mode 100644
index 00000000..b9611981
--- /dev/null
+++ b/packages/core/src/job/abort-reason.ts
@@ -0,0 +1,54 @@
+/**
+ * Reason set on a job's `abortSignal` when the engine aborts a running job.
+ *
+ * Inspect `job.abortSignal.reason` inside `run` to tell why the job is being aborted, e.g. to log
+ * or clean up differently for a timeout vs an explicit cancellation.
+ */
+export type AbortReason = JobTimeout | JobCanceled;
+
+/**
+ * Set as the `abortSignal.reason` when a job is aborted because it exceeded its `timeout`.
+ */
+export class JobTimeout extends Error {
+ /** The timeout, in milliseconds, that was exceeded. */
+ readonly timeoutMs: number;
+
+ constructor(timeoutMs: number) {
+ super(`Job timed out after ${timeoutMs}ms`);
+ this.name = "JobTimeout";
+ this.timeoutMs = timeoutMs;
+ }
+}
+
+/**
+ * Set as the `abortSignal.reason` when a running job is aborted because it was canceled.
+ */
+export class JobCanceled extends Error {
+ constructor() {
+ super("Job was canceled");
+ this.name = "JobCanceled";
+ }
+}
+
+/**
+ * Structured-clone-safe wire form of an {@link AbortReason}, used to convey the reason to a job
+ * running in a worker thread (a live {@link AbortSignal} cannot cross the thread boundary).
+ */
+export type AbortReasonMessage = { kind: "timeout"; timeoutMs: number } | { kind: "canceled" };
+
+/**
+ * Encodes an abort reason into its wire form. Anything that is not a {@link JobTimeout} is treated
+ * as a cancellation.
+ * @param reason The abort reason (typically `signal.reason`).
+ */
+export function serializeAbortReason(reason: unknown): AbortReasonMessage {
+ return reason instanceof JobTimeout ? { kind: "timeout", timeoutMs: reason.timeoutMs } : { kind: "canceled" };
+}
+
+/**
+ * Rebuilds the proper {@link AbortReason} instance from its wire form.
+ * @param message The wire-form message.
+ */
+export function deserializeAbortReason(message: AbortReasonMessage): AbortReason {
+ return message.kind === "timeout" ? new JobTimeout(message.timeoutMs) : new JobCanceled();
+}
diff --git a/packages/core/src/job/index.ts b/packages/core/src/job/index.ts
index 9fc50091..a9016069 100644
--- a/packages/core/src/job/index.ts
+++ b/packages/core/src/job/index.ts
@@ -1 +1,2 @@
+export * from "./abort-reason";
export * from "./job";
diff --git a/packages/core/src/job/job.test.ts b/packages/core/src/job/job.test.ts
index 74706418..4124f5f1 100644
--- a/packages/core/src/job/job.test.ts
+++ b/packages/core/src/job/job.test.ts
@@ -37,6 +37,21 @@ describe("job.ts", () => {
expect(transition.result).toBe("foo bar");
});
+ it("exposes a non-aborting abortSignal by default", () => {
+ const job = new DummyJob();
+ expect(job.abortSignal).toBeInstanceOf(AbortSignal);
+ expect(job.abortSignal.aborted).toBe(false);
+ });
+
+ it("injects the abort signal at runtime", () => {
+ const job = new DummyJob();
+ const controller = new AbortController();
+ job.injectAbortSignal(controller.signal);
+ expect(job.abortSignal).toBe(controller.signal);
+ controller.abort();
+ expect(job.abortSignal.aborted).toBe(true);
+ });
+
it("creates a fail transition", () => {
const job = new DummyJob();
const transition = job.fail("error");
diff --git a/packages/core/src/job/job.ts b/packages/core/src/job/job.ts
index e32bd9c7..59cf2730 100644
--- a/packages/core/src/job/job.ts
+++ b/packages/core/src/job/job.ts
@@ -79,6 +79,18 @@ export abstract class Job implements JobData {
readonly backoff_strategy!: BackoffStrategy;
readonly retry_delay!: number | null;
+ /**
+ * Signal that fires when the engine aborts this run (timeout or cancellation). Available inside
+ * `run`. Pass it to abort-aware APIs (e.g. `fetch(url, { signal: this.abortSignal })`) or check it
+ * cooperatively (`this.abortSignal.throwIfAborted()`, `this.abortSignal.aborted`). The reason is a
+ * `JobTimeout` or `JobCanceled` (see `abortSignal.reason`).
+ *
+ * In `runner: "inline"` mode this is the only way a running job can be stopped. In `"thread"` mode
+ * the worker is also terminated, so honoring the signal is optional (but enables graceful cleanup).
+ * Defaults to a signal that never aborts when no run is in progress.
+ */
+ readonly abortSignal: AbortSignal = new AbortController().signal;
+
/**
* Initializes the job and resolves its script path.
*/
@@ -102,6 +114,14 @@ export abstract class Job implements JobData {
Object.assign(this, jobData);
}
+ /**
+ * Injects the abort signal for this run into the job instance at runtime.
+ * @param signal The abort signal the engine controls for this execution.
+ */
+ injectAbortSignal(signal: AbortSignal): void {
+ Object.assign(this, { abortSignal: signal });
+ }
+
/**
* The class name of this job.
*/
diff --git a/packages/docs/.vitepress/config.mts b/packages/docs/.vitepress/config.mts
index fdac912f..f341bd3b 100644
--- a/packages/docs/.vitepress/config.mts
+++ b/packages/docs/.vitepress/config.mts
@@ -99,6 +99,7 @@ export default defineConfig({
collapsed: false,
items: [
{ text: "Backends", link: "/backends" },
+ { text: "Execution Modes", link: "/execution-modes" },
{ text: "Graceful Shutdown", link: "/graceful-shutdown" },
{ text: "Cleanup", link: "/cleanup" },
{ text: "Manual Job Resolution", link: "/manual-resolution" },
diff --git a/packages/docs/getting-started/configuration.md b/packages/docs/getting-started/configuration.md
index 8be70380..442b4f95 100644
--- a/packages/docs/getting-started/configuration.md
+++ b/packages/docs/getting-started/configuration.md
@@ -122,6 +122,9 @@ await Sidequest.start({
minThreads: 4,
maxThreads: 8,
idleWorkerTimeout: 10000, // 10 seconds
+ fork: true, // run the engine in a child process
+ runner: "thread", // "thread" (worker pool) or "inline"
+ abortGracePeriodMs: 0, // grace before force-killing a timed-out/canceled thread job
// 4. Migration and startup
skipMigration: false,
@@ -179,32 +182,35 @@ await Sidequest.start({
### Configuration Options
-| Option | Description | Default |
-| -------------------------------- | --------------------------------------------------------------------------------------------------------------------------------- | --------------------------- |
-| `backend.driver` | Backend driver package name (SQLite, Postgres, MySQL, MongoDB) | `@sidequest/sqlite-backend` |
-| `backend.config` | Backend-specific connection string or [Knex configuration object](https://knexjs.org/guide/#configuration-options) | `./sidequest.sqlite` |
-| `dashboard.enabled` | Whether to enable the dashboard web interface | `true` |
-| `dashboard.port` | Port for the dashboard web interface | `8678` |
-| `dashboard.auth` | Basic auth configuration with `user` and `password`. If omitted, no auth is required. | `undefined` |
-| `queues` | Array of queue configurations with name, concurrency, priority, and state | `[]` |
-| `maxConcurrentJobs` | Maximum number of jobs processed simultaneously across all queues | `10` |
-| `minThreads` | Minimum number of worker threads to use | Number of CPU cores |
-| `maxThreads` | Maximum number of worker threads to use | `minThreads * 2` |
-| `idleWorkerTimeout` | Timeout (milliseconds) for idle workers before they are terminated | `10000` (10 seconds) |
-| `skipMigration` | Whether to skip database migration on startup | `false` |
-| `releaseStaleJobsIntervalMin` | Frequency (minutes) for releasing stale jobs. Set to `false` to disable | `60` |
-| `releaseStaleJobsMaxStaleMs` | Maximum age (milliseconds) for a running job to be considered stale | `600000` (10 minutes) |
-| `releaseStaleJobsMaxClaimedMs` | Maximum age (milliseconds) for a claimed job to be considered stale | `60000` (1 minute) |
-| `cleanupFinishedJobsIntervalMin` | Frequency (minutes) for cleaning up finished jobs. Set to `false` to disable | `60` |
-| `cleanupFinishedJobsOlderThan` | Age (milliseconds) after which finished jobs are deleted | `2592000000` (30 days) |
-| `logger.level` | Minimum log level (`debug`, `info`, `warn`, `error`) | `info` |
-| `logger.json` | Whether to output logs in JSON format | `false` |
-| `gracefulShutdown` | Whether to enable graceful shutdown handling | `true` |
-| `jobDefaults` | Default values for new jobs. Used while enqueueing | `undefined` |
-| `queueDefaults` | Default values for auto-created queues | `undefined` |
-| `manualJobResolution` | Whether to manually resolve job classes. See [Manual Job Resolution](/production/manual-resolution) | `false` |
-| `jobsFilePath` | Optional path to the file where job classes are exported. Ignored if `manualJobResolution` is `false`. | `undefined` |
-| `jobPollingInterval` | Interval (milliseconds) for polling new jobs to process. Increase this number to reduce DB load at the cost of job start latency. | `100` (100 milliseconds) |
+| Option | Description | Default |
+| -------------------------------- | ---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | --------------------------- |
+| `backend.driver` | Backend driver package name (SQLite, Postgres, MySQL, MongoDB) | `@sidequest/sqlite-backend` |
+| `backend.config` | Backend-specific connection string or [Knex configuration object](https://knexjs.org/guide/#configuration-options) | `./sidequest.sqlite` |
+| `dashboard.enabled` | Whether to enable the dashboard web interface | `true` |
+| `dashboard.port` | Port for the dashboard web interface | `8678` |
+| `dashboard.auth` | Basic auth configuration with `user` and `password`. If omitted, no auth is required. | `undefined` |
+| `queues` | Array of queue configurations with name, concurrency, priority, and state | `[]` |
+| `maxConcurrentJobs` | Maximum number of jobs processed simultaneously across all queues | `10` |
+| `fork` | Run the engine in a child process (crash isolation). Set `false` to run in-process. See [Execution Modes](/production/execution-modes#fork-process-isolation) | `true` |
+| `runner` | How jobs run: `"thread"` (worker pool) or `"inline"` (current thread). See [Execution Modes](/production/execution-modes#runner-thread-pool-vs-inline) | `"thread"` |
+| `abortGracePeriodMs` | Grace period (ms) before a timed-out/canceled job's worker **thread** is force-killed. `0` kills immediately. No effect with `runner: "inline"`. See [Execution Modes](/production/execution-modes#cooperative-timeout-and-cancellation) | `0` |
+| `minThreads` | Minimum number of worker threads to use (`runner: "thread"` only) | Number of CPU cores |
+| `maxThreads` | Maximum number of worker threads to use (`runner: "thread"` only) | `minThreads * 2` |
+| `idleWorkerTimeout` | Timeout (milliseconds) for idle workers before they are terminated (`runner: "thread"` only) | `10000` (10 seconds) |
+| `skipMigration` | Whether to skip database migration on startup | `false` |
+| `releaseStaleJobsIntervalMin` | Frequency (minutes) for releasing stale jobs. Set to `false` to disable | `60` |
+| `releaseStaleJobsMaxStaleMs` | Maximum age (milliseconds) for a running job to be considered stale | `600000` (10 minutes) |
+| `releaseStaleJobsMaxClaimedMs` | Maximum age (milliseconds) for a claimed job to be considered stale | `60000` (1 minute) |
+| `cleanupFinishedJobsIntervalMin` | Frequency (minutes) for cleaning up finished jobs. Set to `false` to disable | `60` |
+| `cleanupFinishedJobsOlderThan` | Age (milliseconds) after which finished jobs are deleted | `2592000000` (30 days) |
+| `logger.level` | Minimum log level (`debug`, `info`, `warn`, `error`) | `info` |
+| `logger.json` | Whether to output logs in JSON format | `false` |
+| `gracefulShutdown` | Whether to enable graceful shutdown handling | `true` |
+| `jobDefaults` | Default values for new jobs. Used while enqueueing | `undefined` |
+| `queueDefaults` | Default values for auto-created queues | `undefined` |
+| `manualJobResolution` | Whether to manually resolve job classes. See [Manual Job Resolution](/production/manual-resolution) | `false` |
+| `jobsFilePath` | Optional path to the file where job classes are exported. Ignored if `manualJobResolution` is `false`. | `undefined` |
+| `jobPollingInterval` | Interval (milliseconds) for polling new jobs to process. Increase this number to reduce DB load at the cost of job start latency. | `100` (100 milliseconds) |
::: danger
If `auth` is not configured and `dashboard: true` is enabled in production, the dashboard will be publicly accessible. This is a security risk and **not recommended**.
diff --git a/packages/docs/guide/jobs/lifecycle.md b/packages/docs/guide/jobs/lifecycle.md
index 304d8544..53ab0d5e 100644
--- a/packages/docs/guide/jobs/lifecycle.md
+++ b/packages/docs/guide/jobs/lifecycle.md
@@ -67,7 +67,11 @@ Jobs can be manually canceled at any point before completion:
- Waiting jobs are immediately marked as `canceled`
- Claimed jobs are marked as `canceled` before execution an are prevented from running
-- Running jobs receive a cancellation signal and transition to `canceled`
+- Running jobs receive a cancellation signal via `this.abortSignal` and transition to `canceled`
+
+::: warning
+Stopping a _running_ job depends on the [execution mode](/production/execution-modes#cooperative-timeout-and-cancellation). With the default thread pool the worker is terminated. In `runner: "inline"` mode the job cannot be force-stopped, so it must honor `this.abortSignal`; a running inline job that ignores it finishes with its own result instead of `canceled`. The same applies to job timeouts.
+:::
## Best Practices
diff --git a/packages/docs/guide/jobs/running.md b/packages/docs/guide/jobs/running.md
index cf2e6c27..09a9fbe7 100644
--- a/packages/docs/guide/jobs/running.md
+++ b/packages/docs/guide/jobs/running.md
@@ -35,15 +35,16 @@ When you need finer control — fail without retrying, retry with a custom delay
Before `run()` executes, Sidequest injects read-only properties onto `this`:
-| Property | Type | Description |
-| ------------------- | ----------- | -------------------------------- |
-| `this.id` | `string` | Job ID |
-| `this.attempt` | `number` | Current attempt number (1-based) |
-| `this.max_attempts` | `number` | Maximum allowed attempts |
-| `this.queue` | `string` | Queue the job is running in |
-| `this.state` | `string` | Current state (`"running"`) |
-| `this.inserted_at` | `Date` | When the job was first enqueued |
-| `this.args` | `unknown[]` | The run arguments |
+| Property | Type | Description |
+| ------------------- | ------------- | --------------------------------------------------------------------------------------------------- |
+| `this.id` | `string` | Job ID |
+| `this.attempt` | `number` | Current attempt number (1-based) |
+| `this.max_attempts` | `number` | Maximum allowed attempts |
+| `this.queue` | `string` | Queue the job is running in |
+| `this.state` | `string` | Current state (`"running"`) |
+| `this.inserted_at` | `Date` | When the job was first enqueued |
+| `this.args` | `unknown[]` | The run arguments |
+| `this.abortSignal` | `AbortSignal` | Aborts when the job times out or is canceled. See [below](#responding-to-timeout-and-cancellation). |
::: warning
These properties are only available inside `run()`. They are `undefined` in the constructor.
@@ -57,9 +58,10 @@ These methods let you explicitly transition the job to a specific lifecycle stat
You must **`return`** the result of every flow control method. Calling one without returning it is a no-op — the transition won't happen.
```typescript
-this.fail("reason"); // ❌ does nothing
+this.fail("reason"); // ❌ does nothing
return this.fail("reason"); // ✅ transitions to failed
```
+
:::
### `return this.complete(result)`
@@ -129,15 +131,42 @@ async run(payload: unknown) {
Use `snooze` for time-based deferrals: rate limit windows, maintenance modes, business hours.
+## Responding to timeout and cancellation
+
+When a job exceeds its `timeout`, or is canceled (via the dashboard or `Sidequest.job.cancel(id)`), Sidequest aborts `this.abortSignal`. Use it to stop your work promptly:
+
+```typescript
+async run(url: string) {
+ // Pass it to any abort-aware API; it cancels automatically.
+ const res = await fetch(url, { signal: this.abortSignal });
+
+ // Or check it cooperatively in loops / between steps.
+ for (const item of await res.json()) {
+ this.abortSignal.throwIfAborted(); // throws if timed out / canceled
+ await process(item);
+ }
+}
+```
+
+`this.abortSignal.reason` is a `JobTimeout` or `JobCanceled` (both exported from `sidequest`) so you can react differently to each.
+
+::: danger Whether the signal can actually stop the job depends on the execution mode
+
+- In the default thread pool with `abortGracePeriodMs: 0`, the worker is terminated, so honoring the signal is optional (it just lets you clean up; set a grace period to get a cooperative window).
+- In **`runner: "inline"` mode there is no way to forcibly stop a job.** If your job ignores `this.abortSignal`, timeouts and cancellation **will not stop it**: it runs to completion. Honoring the signal is mandatory for long-running inline jobs.
+
+See [Execution Modes](/production/execution-modes#cooperative-timeout-and-cancellation) for the full behavior across modes.
+:::
+
## Choosing the right method
-| Situation | Use |
-|---|---|
-| Normal completion | `return result` or `return this.complete(result)` |
-| Permanent, unrecoverable error | `return this.fail(reason)` |
-| Transient error, controlled retry delay | `return this.retry(reason, delay)` |
-| Not the right time — try again later | `return this.snooze(delay)` |
-| Unexpected error — let Sidequest decide | `throw error` |
+| Situation | Use |
+| --------------------------------------- | ------------------------------------------------- |
+| Normal completion | `return result` or `return this.complete(result)` |
+| Permanent, unrecoverable error | `return this.fail(reason)` |
+| Transient error, controlled retry delay | `return this.retry(reason, delay)` |
+| Not the right time — try again later | `return this.snooze(delay)` |
+| Unexpected error — let Sidequest decide | `throw error` |
## Best practices
diff --git a/packages/docs/introduction/how-it-works.md b/packages/docs/introduction/how-it-works.md
index bccf7c42..771b16c7 100644
--- a/packages/docs/introduction/how-it-works.md
+++ b/packages/docs/introduction/how-it-works.md
@@ -30,6 +30,10 @@ Your app process
Because the engine is a separate process, a job that calls `process.exit()` or throws an unhandled exception will kill the engine process but **not your app**. The engine restarts automatically.
+::: tip
+This forked, worker-thread model is the default and the right choice for most deployments. For serverless runtimes, test suites, or framework integrations that need jobs to share live in-process state, you can run the engine in-process and/or run jobs inline. See [Execution Modes](/production/execution-modes).
+:::
+
## How jobs are claimed
The Dispatcher polls the database at a configurable interval (default: **100 ms**). When it finds waiting jobs that fit within queue concurrency limits, it claims them atomically:
diff --git a/packages/docs/production/execution-modes.md b/packages/docs/production/execution-modes.md
new file mode 100644
index 00000000..5216ce97
--- /dev/null
+++ b/packages/docs/production/execution-modes.md
@@ -0,0 +1,194 @@
+---
+outline: deep
+title: Execution Modes
+description: Choose how and where Sidequest runs your jobs (forked vs in-process, thread pool vs inline) and how cooperative timeout/cancellation works.
+---
+
+# Execution Modes
+
+By default Sidequest runs your jobs with **two layers of isolation**: the engine runs in a forked child process, and each job runs in its own worker thread inside that process (see [How It Works](/introduction/how-it-works)). This is the most robust setup and what you want in most deployments.
+
+Some environments and integrations need a different trade-off. Two independent options let you change where and how jobs run:
+
+- [`fork`](#fork-process-isolation): run the engine in a child process (default) or in your application's process.
+- [`runner`](#runner-thread-pool-vs-inline): run each job in a worker thread pool (default) or inline in the current thread.
+
+They are orthogonal: `fork` controls the **process**, `runner` controls the **thread**. A related option, [`abortGracePeriodMs`](#cooperative-timeout-and-cancellation), controls how timeouts and cancellations stop a running job.
+
+::: tip TL;DR
+Keep the defaults (`fork: true`, `runner: "thread"`) unless you have a concrete reason not to. Reach for `inline` + `fork: false` for serverless, test suites, or framework integrations that need jobs to share live in-process state.
+:::
+
+## `fork`: process isolation
+
+```typescript
+await Sidequest.start({ fork: false }); // default: true
+```
+
+| Value | Where the engine runs | Crash isolation |
+| ---------------- | -------------------------- | -------------------------------------------------------------------------------------------------- |
+| `true` (default) | A `child_process.fork` | A job crash (or `process.exit()`) kills the fork, not your app. The engine restarts automatically. |
+| `false` | Your application's process | No isolation. An uncaught error in job code can take down your app. |
+
+Use `fork: false` when:
+
+- You can't spawn child processes (many **serverless / edge** runtimes).
+- You're running an **integration test** and want to avoid IPC and process teardown flakiness.
+- Your jobs need access to **live, in-process state** that can't cross a process boundary, for example a dependency-injection container.
+
+## `runner`: thread pool vs inline
+
+```typescript
+await Sidequest.start({ runner: "inline" }); // default: "thread"
+```
+
+| Value | How a job runs | CPU isolation | Can be force-stopped? |
+| -------------------- | ------------------------------------------------------------------ | ------------- | ------------------------------------- |
+| `"thread"` (default) | In a [piscina](https://github.com/piscinajs/piscina) worker thread | Yes | Yes (the worker thread is terminated) |
+| `"inline"` | Directly in the current thread, no pool | No | **No** |
+
+With `runner: "thread"`, `minThreads` / `maxThreads` / `idleWorkerTimeout` size the pool, and a job can be forcibly stopped by terminating its worker thread.
+
+With `runner: "inline"`, there is no pool and no separate thread. This is required when jobs must reach state that lives in the current thread, and it's handy for single-process setups. But it comes with two important consequences:
+
+::: warning Inline jobs block the event loop
+An inline job runs on the same thread as everything else in that process: the dispatcher, and your app too if `fork: false`. A **CPU-bound** inline job will starve all of it until it finishes. Keep inline jobs I/O-bound, or use the thread pool for heavy work.
+:::
+
+::: danger Inline jobs cannot be forcibly stopped
+There is no separate thread to terminate, so Sidequest **cannot** kill a running inline job. Timeouts and cancellation only work if the job **cooperates** with the abort signal (see [Cooperative timeout and cancellation](#cooperative-timeout-and-cancellation) below). A job that ignores the signal runs to completion no matter what.
+:::
+
+## Choosing a combination
+
+`fork` and `runner` combine into four setups:
+
+| `fork` | `runner` | Crash isolation | CPU isolation | Typical use |
+| ------- | -------- | ---------------- | ------------- | ----------------------------------------------------------------------- |
+| `true` | `thread` | ✅ | ✅ | **Default.** Production. |
+| `true` | `inline` | ✅ (engine fork) | ❌ | Lighter execution with crash isolation kept; e.g. SQLite single-writer. |
+| `false` | `thread` | ❌ | ✅ | Run in-process but still isolate CPU per job. |
+| `false` | `inline` | ❌ | ❌ | Serverless, tests, and integrations that need live in-process state. |
+
+::: code-group
+
+```typescript [Serverless / single-process]
+// No child process, no worker threads: everything in one place.
+await Sidequest.start({
+ fork: false,
+ runner: "inline",
+ backend: { driver: "@sidequest/postgres-backend", config: process.env.DATABASE_URL },
+});
+```
+
+```typescript [SQLite]
+// SQLite is single-writer; running jobs inline avoids cross-thread write contention.
+await Sidequest.start({
+ runner: "inline",
+ maxConcurrentJobs: 1,
+ backend: { driver: "@sidequest/sqlite-backend", config: "./jobs.sqlite" },
+});
+```
+
+```typescript [Integration tests]
+await Sidequest.start({
+ fork: false, // no IPC to wait on
+ runner: "inline", // deterministic, in-process execution
+ backend: { driver: "@sidequest/sqlite-backend", config: ":memory:" },
+});
+```
+
+:::
+
+::: warning SQLite and concurrency
+SQLite allows a single writer. Concurrency above 1 against the same file leads to `SQLITE_BUSY`. Keep `maxConcurrentJobs: 1`, use a separate `.sqlite` file from your app, or use a server database (Postgres/MySQL) for real concurrency. This is independent of the execution mode.
+:::
+
+## Cooperative timeout and cancellation
+
+A job is stopped early in two cases: it exceeds its `timeout`, or it is canceled (via the dashboard or `Sidequest.job.cancel(id)`). How that actually stops the job depends on the mode.
+
+Sidequest hands every job an [`AbortSignal`](https://developer.mozilla.org/en-US/docs/Web/API/AbortSignal) at `this.abortSignal`. When a timeout or cancellation fires, that signal aborts. Your job can observe it and stop:
+
+```typescript
+import { Job } from "sidequest";
+
+export class SyncContactsJob extends Job {
+ async run(accountId: string) {
+ // 1. Hand the signal to anything that accepts one; it aborts automatically.
+ const res = await fetch(`https://api.example.com/${accountId}/contacts`, {
+ signal: this.abortSignal,
+ });
+ const contacts = await res.json();
+
+ // 2. For long loops or CPU work, check it cooperatively.
+ for (const contact of contacts) {
+ this.abortSignal.throwIfAborted(); // bail out promptly on timeout/cancel
+ await upsert(contact);
+ }
+
+ return this.complete({ synced: contacts.length });
+ }
+}
+```
+
+`this.abortSignal.reason` tells you _why_ it aborted. It is a `JobTimeout` or a `JobCanceled`:
+
+```typescript
+import { JobTimeout, JobCanceled } from "sidequest";
+
+this.abortSignal.addEventListener("abort", () => {
+ const reason = this.abortSignal.reason;
+ if (reason instanceof JobTimeout) {
+ // exceeded `timeout`
+ } else if (reason instanceof JobCanceled) {
+ // canceled by an operator
+ }
+});
+```
+
+### When does the job actually receive the signal?
+
+| Mode | Gets a live `abortSignal`? | If the job ignores it |
+| ----------------------------------------------------- | --------------------------------- | --------------------------------------------- |
+| `runner: "inline"` | **Always** | Runs to completion (cannot be force-stopped). |
+| `runner: "thread"`, `abortGracePeriodMs: 0` (default) | No (worker is killed immediately) | Killed right away. |
+| `runner: "thread"`, `abortGracePeriodMs > 0` | Yes, for the grace window | Killed after the grace period. |
+
+So in inline mode `this.abortSignal` is effectively mandatory for any long-running job: a job that does not honor it keeps running until it returns on its own (timeouts and cancellation cannot stop it).
+
+### `abortGracePeriodMs`: graceful kill for thread jobs
+
+```typescript
+await Sidequest.start({ abortGracePeriodMs: 5000 }); // default: 0
+```
+
+Applies only to `runner: "thread"`. It controls the window between _signaling_ an abort and _forcibly terminating_ the worker thread:
+
+- `0` (default): the worker is terminated immediately. The job is not given a chance to react, and `this.abortSignal` is not delivered to it. This is the historical behavior.
+- `> 0`: the abort is delivered to the job via `this.abortSignal` first; if the job has not finished after this many milliseconds, the worker thread is terminated. Use this to let thread jobs clean up (close handles, flush buffers) before being killed.
+
+::: tip
+A positive grace period allocates a small message channel per job to deliver the abort into the worker. The cost only applies while a grace period is configured, and only matters for the rare cancel/timeout. Leave it at `0` unless your thread jobs need graceful shutdown.
+:::
+
+### What state does the job end in?
+
+The terminal state is decided when the run **actually ends**, never while it is still running (so a job is never re-queued while a copy of it is still in flight):
+
+| What happened | Terminal state |
+| -------------------------------------------------------------------- | --------------------------------------------------------------------------------------------------------------------------------------------------- |
+| The job returned a value/transition (it finished) | Whatever the job returned (`completed`, `failed`, a retry, etc.). This holds **even if** a timeout/cancel was signaled but the job finished anyway. |
+| The worker was hard-killed by a **timeout** (thread, no result) | Retried (or `failed` if no attempts remain). |
+| The worker was hard-killed by a **cancellation** (thread, no result) | `canceled`. |
+| The job threw an unexpected error | Retried (or `failed`). |
+
+::: warning Canceling a running inline job is best-effort
+Because an inline job's result is respected once it returns, a running inline job that **ignores** a cancellation and finishes will be recorded with its own result (e.g. `completed`), not `canceled`. Cancellation of a _running_ inline job only takes effect if the job honors `this.abortSignal`. Canceling a **waiting** job always works (it is simply never claimed).
+:::
+
+## Next steps
+
+- [Execution and Control](/guide/jobs/running): using `this.abortSignal` inside `run()`
+- [Configuration reference](/getting-started/configuration): all engine options
+- [Graceful Shutdown](/production/graceful-shutdown): draining jobs on shutdown
diff --git a/packages/engine/src/engine.test.ts b/packages/engine/src/engine.test.ts
index b21f5028..70ece959 100644
--- a/packages/engine/src/engine.test.ts
+++ b/packages/engine/src/engine.test.ts
@@ -24,6 +24,23 @@ vi.mock("child_process", () => ({
}),
}));
+// Mock the in-process worker runtime so the no-fork path doesn't run a real dispatcher loop.
+const workerRuntimeMocks = vi.hoisted(() => ({
+ start: vi.fn().mockResolvedValue(undefined),
+ shutdown: vi.fn().mockResolvedValue(undefined),
+}));
+
+vi.mock("./workers/worker-runtime", () => ({
+ WorkerRuntime: vi.fn(function () {
+ return {
+ start: workerRuntimeMocks.start,
+ shutdown: workerRuntimeMocks.shutdown,
+ };
+ }),
+}));
+
+import { fork } from "child_process";
+
export class ParameterizedJob extends DummyJob {
constructor(
public param1: string,
@@ -399,6 +416,26 @@ describe("Engine", () => {
await engine.close();
});
+ sidequestTest("runs in-process and skips fork when fork is false", async () => {
+ vi.mocked(fork).mockClear();
+ workerRuntimeMocks.start.mockClear();
+ workerRuntimeMocks.shutdown.mockClear();
+
+ const engine = new Engine();
+
+ await engine.start({
+ backend: { driver: "@sidequest/sqlite-backend", config: ":memory:" },
+ fork: false,
+ gracefulShutdown: false,
+ });
+
+ expect(workerRuntimeMocks.start).toHaveBeenCalledTimes(1);
+ expect(fork).not.toHaveBeenCalled();
+
+ await engine.close();
+ expect(workerRuntimeMocks.shutdown).toHaveBeenCalledTimes(1);
+ });
+
sidequestTest("should warn when starting already started engine", async () => {
const engine = new Engine();
const config = {
diff --git a/packages/engine/src/engine.ts b/packages/engine/src/engine.ts
index cb8daf87..c53e5904 100644
--- a/packages/engine/src/engine.ts
+++ b/packages/engine/src/engine.ts
@@ -13,6 +13,7 @@ import { JobBuilder, JobBuilderDefaults } from "./job/job-builder";
import { grantQueueConfig, QueueDefaults } from "./queue/grant-queue-config";
import { findSidequestJobsScriptInParentDirs, resolveScriptPath } from "./shared-runner";
import { clearGracefulShutdown, gracefulShutdown } from "./utils/shutdown";
+import { WorkerRuntime } from "./workers/worker-runtime";
/**
* Configuration options for the Sidequest engine.
@@ -40,6 +41,45 @@ export interface EngineConfig {
cleanupFinishedJobsOlderThan?: number;
/** Whether to enable graceful shutdown handling. Defaults to `true` */
gracefulShutdown?: boolean;
+ /**
+ * Whether to run the engine in a forked child process.
+ *
+ * - `true` (default): the engine runs in a `child_process.fork`, isolating job-code crashes from
+ * the host application.
+ * - `false`: the engine runs in the host process. A crash in job code can take down the host, but
+ * jobs can reach live in-process state. Useful for single-process setups (serverless, tests)
+ * and required by framework integrations that rely on in-process execution.
+ *
+ * Defaults to `true`.
+ */
+ fork?: boolean;
+ /**
+ * How jobs are executed.
+ *
+ * - `"thread"` (default): jobs run in a pool of worker threads (piscina). Gives CPU isolation
+ * and lets timeouts/cancellation forcibly abort a running job.
+ * - `"inline"`: jobs run in the current process/thread, with no worker pool. A running job cannot
+ * be forcibly terminated, so timeouts and cancellation are delivered cooperatively via
+ * `this.abortSignal` inside the job (the job must honor it to stop early); a job that ignores it
+ * runs to completion, and a CPU-bound job will block the event loop. Useful for single-process
+ * setups (serverless, tests, SQLite) and required when jobs need access to live in-process state.
+ *
+ * Defaults to `"thread"`.
+ */
+ runner?: "thread" | "inline";
+ /**
+ * Grace period, in milliseconds, between cooperatively aborting a running job (timeout or
+ * cancellation) and forcibly terminating its worker thread.
+ *
+ * Only applies to `runner: "thread"`. When greater than `0`, the abort is first delivered to the
+ * job via `this.abortSignal` so it can stop and clean up; if it has not finished after this many
+ * milliseconds, the worker thread is terminated. When `0` (default), the worker is terminated
+ * immediately with no cooperative window, preserving the previous behavior. Has no effect in
+ * `runner: "inline"` (there is no thread to terminate).
+ *
+ * Defaults to `0`.
+ */
+ abortGracePeriodMs?: number;
/** Minimum number of worker threads to use. Defaults to number of CPUs */
minThreads?: number;
/** Maximum number of worker threads to use. Defaults to `minThreads * 2` */
@@ -123,6 +163,12 @@ export class Engine {
*/
private mainWorker?: ChildProcess;
+ /**
+ * Worker runtime when the engine runs in-process (`fork: false`).
+ * Mutually exclusive with {@link mainWorker}.
+ */
+ private inProcessRuntime?: WorkerRuntime;
+
/**
* Flag indicating whether the engine is currently shutting down.
* This is used to prevent multiple shutdown attempts and ensure graceful shutdown behavior.
@@ -155,6 +201,9 @@ export class Engine {
json: config?.logger?.json ?? false,
},
gracefulShutdown: config?.gracefulShutdown ?? true,
+ fork: config?.fork ?? true,
+ runner: config?.runner ?? "thread",
+ abortGracePeriodMs: config?.abortGracePeriodMs ?? 0,
minThreads: config?.minThreads ?? cpus().length,
maxThreads: config?.maxThreads ?? cpus().length * 2,
idleWorkerTimeout: config?.idleWorkerTimeout ?? 10_000,
@@ -241,7 +290,7 @@ export class Engine {
* @param config Optional configuration object.
*/
async start(config: EngineConfig): Promise {
- if (this.mainWorker) {
+ if (this.mainWorker || this.inProcessRuntime) {
logger("Engine").warn("Sidequest engine already started");
return;
}
@@ -256,6 +305,14 @@ export class Engine {
}
}
+ if (!nonNullConfig.fork) {
+ logger("Engine").info("Starting Sidequest in-process (fork disabled)");
+ this.inProcessRuntime = new WorkerRuntime(dependencyRegistry.get(Dependency.Backend)!, nonNullConfig);
+ await this.inProcessRuntime.start();
+ gracefulShutdown(this.close.bind(this), "Engine", nonNullConfig.gracefulShutdown);
+ return;
+ }
+
return new Promise((resolve, reject) => {
const timeout = setTimeout(() => {
reject(new Error("Timeout on starting sidequest fork!"));
@@ -323,12 +380,16 @@ export class Engine {
this.mainWorker.send({ type: "shutdown" });
await promise;
}
+ if (this.inProcessRuntime) {
+ await this.inProcessRuntime.shutdown();
+ }
try {
await dependencyRegistry.get(Dependency.Backend)?.close();
} catch (error) {
logger("Engine").error("Error closing backend:", error);
}
this.mainWorker = undefined;
+ this.inProcessRuntime = undefined;
// Reset the shutting down flag after closing
// This allows the engine to be reconfigured or restarted later
clearGracefulShutdown();
diff --git a/packages/engine/src/execution/dispatcher.ts b/packages/engine/src/execution/dispatcher.ts
index 493fe82c..eec75ba3 100644
--- a/packages/engine/src/execution/dispatcher.ts
+++ b/packages/engine/src/execution/dispatcher.ts
@@ -60,8 +60,11 @@ export class Dispatcher {
// because the execution is not awaited. This way we ensure that available slots
// are correctly calculated.
this.executorManager.queueJob(queue, job);
- // does not await for job execution.
- void this.executorManager.execute(queue, job);
+ // does not await for job execution. Guard against any unexpected rejection so a single
+ // job can never crash the engine with an unhandled promise rejection.
+ void this.executorManager.execute(queue, job).catch((error: unknown) => {
+ logger("Dispatcher").error(`Unexpected error executing job ${job.id}:`, error);
+ });
}
}
diff --git a/packages/engine/src/execution/executor-manager.test.ts b/packages/engine/src/execution/executor-manager.test.ts
index 0565887f..850bfb78 100644
--- a/packages/engine/src/execution/executor-manager.test.ts
+++ b/packages/engine/src/execution/executor-manager.test.ts
@@ -1,13 +1,20 @@
import { sidequestTest, SidequestTestFixture } from "@/tests/fixture";
import { Backend } from "@sidequest/backend";
-import { CompletedResult, JobData, RetryTransition, RunTransition } from "@sidequest/core";
-import EventEmitter from "events";
+import {
+ CancelTransition,
+ CompletedResult,
+ CompleteTransition,
+ JobData,
+ RetryTransition,
+ RunTransition,
+} from "@sidequest/core";
import { JobTransitioner } from "../job/job-transitioner";
import { grantQueueConfig } from "../queue/grant-queue-config";
import { DummyJob } from "../test-jobs/dummy-job";
import { ExecutorManager } from "./executor-manager";
const runMock = vi.hoisted(() => vi.fn());
+const inlineRunMock = vi.hoisted(() => vi.fn());
vi.mock("../shared-runner", () => ({
RunnerPool: vi.fn().mockImplementation(function () {
@@ -16,6 +23,12 @@ vi.mock("../shared-runner", () => ({
destroy: vi.fn(),
};
}),
+ InlineRunner: vi.fn().mockImplementation(function () {
+ return {
+ run: inlineRunMock,
+ destroy: vi.fn(),
+ };
+ }),
}));
vi.mock("../job/job-transitioner", () => ({
@@ -65,12 +78,77 @@ describe("ExecutorManager", () => {
expect(executorManager.availableSlotsGlobal()).toEqual(9);
await execPromise;
- expect(runMock).toBeCalledWith(jobData, expect.any(EventEmitter));
+ expect(runMock).toBeCalledWith(jobData, expect.any(AbortSignal));
expect(executorManager.availableSlotsByQueue(queryConfig)).toEqual(1);
expect(executorManager.availableSlotsGlobal()).toEqual(10);
await executorManager.destroy();
});
+ sidequestTest("uses the inline runner when runner is 'inline'", async ({ backend, config }) => {
+ inlineRunMock.mockResolvedValue({
+ __is_job_transition__: true,
+ type: "completed",
+ result: "result",
+ } satisfies CompletedResult);
+ const queryConfig = await grantQueueConfig(backend, { name: "default", concurrency: 1 });
+ const executorManager = new ExecutorManager(backend, { ...config, runner: "inline" });
+
+ await executorManager.execute(queryConfig, jobData);
+
+ expect(inlineRunMock).toBeCalledWith(jobData, expect.any(AbortSignal));
+ expect(runMock).not.toHaveBeenCalled();
+ await executorManager.destroy();
+ inlineRunMock.mockReset();
+ });
+
+ sidequestTest(
+ "inline: a job that ignores the timeout completes instead of being retried",
+ async ({ backend, config }) => {
+ jobData = await backend.updateJob({ ...jobData, state: "claimed", claimed_at: new Date(), timeout: 20 });
+ const queryConfig = await grantQueueConfig(backend, { name: "default", concurrency: 1 });
+ const executorManager = new ExecutorManager(backend, { ...config, runner: "inline" });
+
+ // The job ignores the abort signal and completes after the timeout has already fired.
+ inlineRunMock.mockImplementationOnce(
+ () =>
+ new Promise((resolve) =>
+ setTimeout(() => resolve({ __is_job_transition__: true, type: "completed", result: "done" }), 60),
+ ),
+ );
+
+ await executorManager.execute(queryConfig, jobData);
+
+ // Timeout fired (signal aborted) but inline applies the job's own result: completed, not a retry.
+ // eslint-disable-next-line @typescript-eslint/unbound-method
+ expect(JobTransitioner.apply).toHaveBeenCalledWith(backend, jobData, expect.any(CompleteTransition));
+ // eslint-disable-next-line @typescript-eslint/unbound-method
+ expect(JobTransitioner.apply).not.toHaveBeenCalledWith(backend, jobData, expect.any(RetryTransition));
+ await executorManager.destroy();
+ inlineRunMock.mockReset();
+ },
+ );
+
+ sidequestTest(
+ "inline: a job that ignores cancellation completes (its result wins)",
+ async ({ backend, config }) => {
+ // Pre-cancel in the DB so the first cancellation poll observes it immediately. JobTransitioner is
+ // mocked here, so the RunTransition does not overwrite the persisted state.
+ jobData = await backend.updateJob({ ...jobData, state: "canceled" });
+ const queryConfig = await grantQueueConfig(backend, { name: "default", concurrency: 1 });
+ const executorManager = new ExecutorManager(backend, { ...config, runner: "inline" });
+
+ inlineRunMock.mockResolvedValue({ __is_job_transition__: true, type: "completed", result: "done" });
+
+ await executorManager.execute(queryConfig, jobData);
+
+ // Inline cannot force-stop the job; it completed, so its result is applied.
+ // eslint-disable-next-line @typescript-eslint/unbound-method
+ expect(JobTransitioner.apply).toHaveBeenCalledWith(backend, jobData, expect.any(CompleteTransition));
+ await executorManager.destroy();
+ inlineRunMock.mockReset();
+ },
+ );
+
sidequestTest("should abort job execution on job cancel", async ({ backend, config }) => {
await backend.updateJob({ ...jobData, state: "claimed", claimed_at: new Date() });
@@ -78,9 +156,9 @@ describe("ExecutorManager", () => {
const executorManager = new ExecutorManager(backend, config);
let expectedPromise;
- runMock.mockImplementationOnce(async (job: JobData, signal: EventEmitter) => {
+ runMock.mockImplementationOnce(async (job: JobData, signal: AbortSignal) => {
const promise = new Promise((_, reject) => {
- signal.on("abort", () => {
+ signal.addEventListener("abort", () => {
reject(new Error("The task has been aborted"));
});
});
@@ -95,7 +173,7 @@ describe("ExecutorManager", () => {
expect(executorManager.availableSlotsGlobal()).toEqual(9);
await execPromise;
- expect(runMock).toBeCalledWith(jobData, expect.any(EventEmitter));
+ expect(runMock).toBeCalledWith(jobData, expect.any(AbortSignal));
expect(runMock).toHaveReturnedWith(expectedPromise);
await expect(expectedPromise).rejects.toThrow("The task has been aborted");
expect(executorManager.availableSlotsByQueue(queryConfig)).toEqual(1);
@@ -109,6 +187,79 @@ describe("ExecutorManager", () => {
await executorManager.destroy();
});
+ sidequestTest("respects a job's returned result even after a cancel was signaled", async ({ backend, config }) => {
+ await backend.updateJob({ ...jobData, state: "claimed", claimed_at: new Date() });
+
+ const queryConfig = await grantQueueConfig(backend, { name: "default", concurrency: 1 });
+ const executorManager = new ExecutorManager(backend, config);
+
+ // The job is canceled mid-run but ignores the abort signal and returns a completed result.
+ runMock.mockImplementationOnce(async (job: JobData, signal: AbortSignal) => {
+ await backend.updateJob({ ...job, state: "canceled" });
+ while (!signal.aborted) {
+ await new Promise((r) => setTimeout(r, 50));
+ }
+ return { __is_job_transition__: true, type: "completed", result: "result" } as CompletedResult;
+ });
+
+ await executorManager.execute(queryConfig, jobData);
+
+ // The job returned a state, so it is respected: the completion is applied.
+ // eslint-disable-next-line @typescript-eslint/unbound-method
+ expect(JobTransitioner.apply).toHaveBeenCalledWith(backend, jobData, expect.any(CompleteTransition));
+
+ await executorManager.destroy();
+ });
+
+ sidequestTest("a hard-killed canceled job transitions to canceled", async ({ backend, config }) => {
+ await backend.updateJob({ ...jobData, state: "claimed", claimed_at: new Date() });
+
+ const queryConfig = await grantQueueConfig(backend, { name: "default", concurrency: 1 });
+ const executorManager = new ExecutorManager(backend, config);
+
+ // The job is canceled and never produces a result (the worker is terminated -> the run rejects).
+ runMock.mockImplementationOnce(async (job: JobData, signal: AbortSignal) => {
+ await backend.updateJob({ ...job, state: "canceled" });
+ return new Promise((_, reject) => {
+ signal.addEventListener("abort", () => reject(new Error("The task has been aborted")));
+ });
+ });
+
+ await executorManager.execute(queryConfig, jobData);
+
+ // No result: the abort reason (canceled) decides the terminal state.
+ // eslint-disable-next-line @typescript-eslint/unbound-method
+ expect(JobTransitioner.apply).toHaveBeenCalledWith(backend, jobData, expect.any(CancelTransition));
+ // eslint-disable-next-line @typescript-eslint/unbound-method
+ expect(JobTransitioner.apply).not.toHaveBeenCalledWith(backend, jobData, expect.any(CompleteTransition));
+
+ await executorManager.destroy();
+ });
+
+ sidequestTest("does not crash when the final transition fails (job row gone)", async ({ backend, config }) => {
+ const queryConfig = await grantQueueConfig(backend, { name: "default", concurrency: 1 });
+ const executorManager = new ExecutorManager(backend, config);
+
+ // RunTransition succeeds; the terminal transition fails because the row was deleted mid-run.
+ // eslint-disable-next-line @typescript-eslint/unbound-method
+ vi.mocked(JobTransitioner.apply)
+ .mockImplementationOnce((_backend: Backend, job: JobData) => job)
+ .mockImplementationOnce(() => {
+ throw new Error("Cannot update job, not found.");
+ });
+ runMock.mockResolvedValue({
+ __is_job_transition__: true,
+ type: "completed",
+ result: "ok",
+ } satisfies CompletedResult);
+
+ // The fire-and-forget executor must not reject, and must free the job from the active set.
+ await expect(executorManager.execute(queryConfig, jobData)).resolves.toBeUndefined();
+ expect(executorManager.totalActiveWorkers()).toBe(0);
+
+ await executorManager.destroy();
+ });
+
sidequestTest("should abort job execution on timeout", async ({ backend, config }) => {
jobData = await backend.updateJob({ ...jobData, state: "claimed", claimed_at: new Date(), timeout: 100 });
@@ -116,9 +267,9 @@ describe("ExecutorManager", () => {
const executorManager = new ExecutorManager(backend, config);
let expectedPromise;
- runMock.mockImplementationOnce(async (job: JobData, signal: EventEmitter) => {
+ runMock.mockImplementationOnce(async (job: JobData, signal: AbortSignal) => {
const promise = new Promise((_, reject) => {
- signal.on("abort", () => {
+ signal.addEventListener("abort", () => {
reject(new Error("The task has been aborted"));
});
});
@@ -132,7 +283,7 @@ describe("ExecutorManager", () => {
expect(executorManager.availableSlotsGlobal()).toEqual(9);
await execPromise;
- expect(runMock).toBeCalledWith(jobData, expect.any(EventEmitter));
+ expect(runMock).toBeCalledWith(jobData, expect.any(AbortSignal));
expect(runMock).toHaveReturnedWith(expectedPromise);
await expect(expectedPromise).rejects.toThrow("The task has been aborted");
expect(executorManager.availableSlotsByQueue(queryConfig)).toEqual(1);
@@ -162,7 +313,7 @@ describe("ExecutorManager", () => {
expect(executorManager.availableSlotsGlobal()).toEqual(9);
await execPromise;
- expect(runMock).toBeCalledWith(jobData, expect.any(EventEmitter));
+ expect(runMock).toBeCalledWith(jobData, expect.any(AbortSignal));
expect(executorManager.availableSlotsByQueue(queryConfig)).toEqual(1);
expect(executorManager.availableSlotsGlobal()).toEqual(10);
diff --git a/packages/engine/src/execution/executor-manager.ts b/packages/engine/src/execution/executor-manager.ts
index f97b357d..1468ba36 100644
--- a/packages/engine/src/execution/executor-manager.ts
+++ b/packages/engine/src/execution/executor-manager.ts
@@ -1,10 +1,20 @@
import { Backend } from "@sidequest/backend";
-import { JobData, JobTransitionFactory, logger, QueueConfig, RetryTransition, RunTransition } from "@sidequest/core";
-import EventEmitter from "events";
+import {
+ CancelTransition,
+ JobCanceled,
+ JobData,
+ JobTimeout,
+ JobTransition,
+ JobTransitionFactory,
+ logger,
+ QueueConfig,
+ RetryTransition,
+ RunTransition,
+} from "@sidequest/core";
import { inspect } from "util";
import { NonNullableEngineConfig } from "../engine";
import { JobTransitioner } from "../job/job-transitioner";
-import { RunnerPool } from "../shared-runner";
+import { InlineRunner, JobRunner, RunnerPool } from "../shared-runner";
/**
* Manages job execution and worker concurrency for Sidequest.
@@ -12,7 +22,7 @@ import { RunnerPool } from "../shared-runner";
export class ExecutorManager {
private activeByQueue: Record>;
private activeJobs: Set;
- private runnerPool: RunnerPool;
+ private jobRunner: JobRunner;
/**
* Creates a new ExecutorManager.
@@ -25,7 +35,10 @@ export class ExecutorManager {
) {
this.activeByQueue = {};
this.activeJobs = new Set();
- this.runnerPool = new RunnerPool(this.nonNullConfig);
+ this.jobRunner =
+ this.nonNullConfig.runner === "inline"
+ ? new InlineRunner(this.nonNullConfig)
+ : new RunnerPool(this.nonNullConfig);
}
/**
@@ -88,6 +101,8 @@ export class ExecutorManager {
*/
async execute(queueConfig: QueueConfig, job: JobData): Promise {
let isRunning = false;
+ const controller = new AbortController();
+ let timeoutHandle: ReturnType | undefined;
try {
logger("Executor Manager").debug(`Submitting job ${job.id} for execution in queue ${queueConfig.name}`);
// We call prepareJob here again to make sure the jobs are in the queues.
@@ -97,57 +112,91 @@ export class ExecutorManager {
job = await JobTransitioner.apply(this.backend, job, new RunTransition());
isRunning = true;
- const signal = new EventEmitter();
const cancellationCheck = async () => {
while (isRunning) {
const watchedJob = await this.backend.getJob(job.id);
- if (watchedJob!.state === "canceled") {
- logger("Executor Manager").debug(`Emitting abort signal for job ${job.id}`);
- signal.emit("abort");
+ if (watchedJob?.state === "canceled") {
+ logger("Executor Manager").debug(`Aborting job ${job.id}: canceled`);
+ controller.abort(new JobCanceled());
isRunning = false;
return;
}
await new Promise((r) => setTimeout(r, 1000));
}
};
- void cancellationCheck();
+ void cancellationCheck().catch((error) => {
+ logger("Executor Manager").error(`Cancellation watcher for job ${job.id} failed:`, error);
+ });
logger("Executor Manager").debug(`Running job ${job.id} in queue ${queueConfig.name}`);
- const runPromise = this.runnerPool.run(job, signal);
+ const runPromise = this.jobRunner.run(job, controller.signal);
if (job.timeout) {
- void new Promise(() => {
- setTimeout(() => {
- logger("Executor Manager").debug(`Job ${job.id} timed out after ${job.timeout}ms, aborting.`);
- signal.emit("abort");
- void JobTransitioner.apply(this.backend, job, new RetryTransition(`Job timed out after ${job.timeout}ms`));
- }, job.timeout!);
- });
+ // Only signal the abort here. The terminal transition is decided when the run actually ends
+ // (resolve or reject) so a still-running job is never re-queued underneath itself.
+ timeoutHandle = setTimeout(() => {
+ logger("Executor Manager").debug(`Job ${job.id} timed out after ${job.timeout}ms, aborting.`);
+ controller.abort(new JobTimeout(job.timeout!));
+ }, job.timeout);
}
const result = await runPromise;
isRunning = false;
- logger("Executor Manager").debug(`Job ${job.id} completed with result: ${inspect(result)}`);
- const transition = JobTransitionFactory.create(result);
- await JobTransitioner.apply(this.backend, job, transition);
+ // The job ran to a conclusion and returned a state (even if a timeout/cancel was signaled);
+ // respect it and transition accordingly.
+ logger("Executor Manager").debug(`Job ${job.id} settled with result: ${inspect(result)}`);
+ await this.applyTerminalTransition(job, JobTransitionFactory.create(result));
} catch (error: unknown) {
isRunning = false;
const err = error as Error;
- if (err.message === "The task has been aborted") {
- logger("Executor Manager").debug(`Job ${job.id} was aborted`);
+ if (controller.signal.aborted) {
+ // The run produced no result because the worker was hard-killed (thread). Only a clear
+ // cancellation maps to canceled; every other abort reason (timeout, or anything else) defaults
+ // to a retry as a failsafe. The rejection is logged so a real error during the abort is kept.
+ const reason: unknown = controller.signal.reason;
+ logger("Executor Manager").debug(`Job ${job.id} was hard-killed (${String(reason)}): ${err.message}`);
+ const transition =
+ reason instanceof JobCanceled
+ ? new CancelTransition()
+ : new RetryTransition(reason instanceof Error ? reason : new Error(`Job aborted: ${String(reason)}`));
+ await this.applyTerminalTransition(job, transition);
} else {
logger("Executor Manager").error(`Unhandled error while executing job ${job.id}: ${err.message}`);
- await JobTransitioner.apply(this.backend, job, new RetryTransition(err));
+ await this.applyTerminalTransition(job, new RetryTransition(err));
}
} finally {
isRunning = false;
+ if (timeoutHandle) {
+ clearTimeout(timeoutHandle);
+ }
this.activeByQueue[queueConfig.name].delete(job.id);
this.activeJobs.delete(job.id);
}
}
+ /**
+ * Applies a job's final transition, tolerating the job row having disappeared.
+ *
+ * A job's row can be deleted while it runs (cleanup routine, an explicit delete, or a test
+ * truncating the table). Recording its terminal state is then impossible and safe to skip. This
+ * must never throw: `execute` is fire-and-forget, so an error here would surface as an unhandled
+ * rejection.
+ *
+ * @param job The job being finalized.
+ * @param transition The terminal transition to apply.
+ */
+ private async applyTerminalTransition(job: JobData, transition: JobTransition): Promise {
+ try {
+ await JobTransitioner.apply(this.backend, job, transition);
+ } catch (error) {
+ logger("Executor Manager").warn(
+ `Could not record terminal state for job ${job.id} (it may no longer exist): ${error instanceof Error ? error.message : String(error)}`,
+ );
+ }
+ }
+
/**
* Destroys the runner pool and releases resources.
*/
@@ -155,13 +204,13 @@ export class ExecutorManager {
await new Promise((resolve, reject) => {
const checkJobs = () => {
if (this.totalActiveWorkers() === 0) {
- logger("ExecutorManager").info("All active jobs finished. Destroying runner pool.");
+ logger("ExecutorManager").info("All active jobs finished. Destroying runner.");
try {
- this.runnerPool.destroy();
- logger("ExecutorManager").debug("Runner pool destroyed. Returning.");
+ this.jobRunner.destroy();
+ logger("ExecutorManager").debug("Runner destroyed. Returning.");
resolve();
} catch (error) {
- logger("ExecutorManager").error("Error while destroying runner pool:", error);
+ logger("ExecutorManager").error("Error while destroying runner:", error);
reject(error as Error);
}
} else {
diff --git a/packages/engine/src/shared-runner/index.ts b/packages/engine/src/shared-runner/index.ts
index e2d80d65..2112f5b0 100644
--- a/packages/engine/src/shared-runner/index.ts
+++ b/packages/engine/src/shared-runner/index.ts
@@ -1,4 +1,6 @@
import * as runner from "./runner";
+export * from "./inline-runner";
+export * from "./job-runner";
export * from "./manual-loader";
export * from "./runner-pool";
export { runner as run };
diff --git a/packages/engine/src/shared-runner/inline-runner.test.ts b/packages/engine/src/shared-runner/inline-runner.test.ts
new file mode 100644
index 00000000..74bc540f
--- /dev/null
+++ b/packages/engine/src/shared-runner/inline-runner.test.ts
@@ -0,0 +1,52 @@
+import { sidequestTest, SidequestTestFixture } from "@/tests/fixture";
+import { JobData } from "@sidequest/core";
+import { beforeEach, describe, expect, vi } from "vitest";
+import { DummyJob } from "../test-jobs/dummy-job";
+import { InlineRunner } from "./inline-runner";
+
+// Spy on the runner module's default export so we can assert how the InlineRunner delegates to it.
+vi.mock("./runner", async (importOriginal) => {
+ const original = await importOriginal();
+ return { default: vi.fn(original.default), injectSidequestConfig: original.injectSidequestConfig };
+});
+
+import run from "./runner";
+
+describe("InlineRunner", () => {
+ let runner: InlineRunner;
+ let jobData: JobData;
+
+ beforeEach(async ({ backend, config }) => {
+ vi.clearAllMocks();
+
+ const job = new DummyJob();
+ await job.ready();
+
+ jobData = await backend.createNewJob({
+ class: job.className,
+ script: job.script,
+ args: [],
+ attempt: 0,
+ queue: "default",
+ constructor_args: [],
+ state: "waiting",
+ });
+
+ runner = new InlineRunner(config);
+ });
+
+ sidequestTest("runs the job in-process and returns its result", async () => {
+ const result = await runner.run(jobData);
+ expect(result).toEqual({ __is_job_transition__: true, type: "completed", result: "dummy job" });
+ });
+
+ sidequestTest("delegates to the runner with inline enabled and forwards the signal", async ({ config }) => {
+ const signal = new AbortController().signal;
+ await runner.run(jobData, signal);
+ expect(run).toHaveBeenCalledWith({ jobData, config, inline: true, signal });
+ });
+
+ sidequestTest("destroy is a no-op and does not throw", () => {
+ expect(() => runner.destroy()).not.toThrow();
+ });
+});
diff --git a/packages/engine/src/shared-runner/inline-runner.ts b/packages/engine/src/shared-runner/inline-runner.ts
new file mode 100644
index 00000000..f5ceb0fc
--- /dev/null
+++ b/packages/engine/src/shared-runner/inline-runner.ts
@@ -0,0 +1,40 @@
+import { JobData, JobResult, logger } from "@sidequest/core";
+import { NonNullableEngineConfig } from "../engine";
+import { JobRunner } from "./job-runner";
+import run from "./runner";
+
+/**
+ * Runs jobs in the current process/thread instead of a worker thread pool.
+ *
+ * Used by the inline execution mode (`runner: "inline"`). Unlike {@link RunnerPool}, a running job
+ * cannot be forcibly aborted: cancellation and timeouts are best-effort only, and a CPU-bound job
+ * will block the event loop. In exchange, jobs run in the host process and can reach live
+ * in-process state (the basis for framework integrations like NestJS).
+ */
+export class InlineRunner implements JobRunner {
+ /**
+ * Creates a new InlineRunner.
+ * @param nonNullConfig The non-nullable engine configuration.
+ */
+ constructor(private nonNullConfig: NonNullableEngineConfig) {}
+
+ /**
+ * Runs a job in the current process. The abort signal is forwarded to the job (as
+ * `this.abortSignal`) so it can stop cooperatively: inline execution has no separate thread to
+ * terminate, so this is the only way timeouts and cancellation can take effect.
+ * @param job The job data to run.
+ * @param signal Abort signal handed to the job for cooperative cancellation.
+ * @returns A promise resolving to the job result.
+ */
+ run(job: JobData, signal?: AbortSignal): Promise {
+ logger("InlineRunner").debug(`Running job ${job.id} inline`);
+ return run({ jobData: job, config: this.nonNullConfig, inline: true, signal });
+ }
+
+ /**
+ * Releases resources. No-op for the inline runner.
+ */
+ destroy(): void {
+ // There is no pool to tear down. In-flight jobs are awaited by the ExecutorManager.
+ }
+}
diff --git a/packages/engine/src/shared-runner/job-runner.ts b/packages/engine/src/shared-runner/job-runner.ts
new file mode 100644
index 00000000..14d851f0
--- /dev/null
+++ b/packages/engine/src/shared-runner/job-runner.ts
@@ -0,0 +1,23 @@
+import { JobData, JobResult } from "@sidequest/core";
+
+/**
+ * Abstraction over how a claimed job is actually executed.
+ *
+ * Implemented by the thread-based {@link RunnerPool} (piscina worker pool) and the
+ * {@link InlineRunner} (same-process execution). The {@link ExecutorManager} picks one based on
+ * the engine's `runner` configuration.
+ */
+export interface JobRunner {
+ /**
+ * Runs a job and resolves with its result.
+ * @param job The job data to run.
+ * @param signal Abort signal for the run. The thread runner uses it to terminate the worker; the
+ * inline runner forwards it to the job so it can stop cooperatively.
+ */
+ run(job: JobData, signal?: AbortSignal): Promise;
+
+ /**
+ * Releases any resources held by the runner.
+ */
+ destroy(): void;
+}
diff --git a/packages/engine/src/shared-runner/runner-pool.test.ts b/packages/engine/src/shared-runner/runner-pool.test.ts
index 8c4bba95..5059c9b4 100644
--- a/packages/engine/src/shared-runner/runner-pool.test.ts
+++ b/packages/engine/src/shared-runner/runner-pool.test.ts
@@ -1,6 +1,6 @@
import { sidequestTest, SidequestTestFixture } from "@/tests/fixture";
-import { JobData } from "@sidequest/core";
-import EventEmitter from "events";
+import { AbortReasonMessage, JobData, JobTimeout } from "@sidequest/core";
+import { MessagePort } from "node:worker_threads";
import { beforeEach, describe, expect, vi } from "vitest";
import { DummyJob } from "../test-jobs/dummy-job";
import { RunnerPool } from "./runner-pool";
@@ -39,14 +39,65 @@ describe("RunnerPool", () => {
pool = new RunnerPool(config);
});
- sidequestTest("should call pool.run with job data", async ({ config }) => {
- const emiter = new EventEmitter();
- const result = await pool.run(jobData, emiter);
+ sidequestTest("passes the abort signal straight to piscina when grace is 0", async ({ config }) => {
+ const signal = new AbortController().signal;
+ const result = await pool.run(jobData, signal);
expect(result).toEqual({ type: "completed", result: "ok" });
- expect(piscinaMockInstance.run).toHaveBeenCalledWith({ jobData, config }, { signal: emiter });
+ expect(piscinaMockInstance.run).toHaveBeenCalledWith({ jobData, config }, { signal });
});
+ sidequestTest("rejects without running the job when the signal is already aborted", async () => {
+ const controller = new AbortController();
+ controller.abort(new Error("already gone"));
+
+ await expect(pool.run(jobData, controller.signal)).rejects.toThrow("already gone");
+ expect(piscinaMockInstance.run).not.toHaveBeenCalled();
+ });
+
+ sidequestTest(
+ "with a grace period, delivers the abort over a port and hard-kills after the grace",
+ async ({ config }) => {
+ vi.useFakeTimers();
+ try {
+ const gracePool = new RunnerPool({ ...config, abortGracePeriodMs: 1000 });
+
+ let capturedPort: MessagePort | undefined;
+ let hardKillSignal: AbortSignal | undefined;
+ piscinaMockInstance.run.mockImplementationOnce(
+ (value: { abortPort: MessagePort }, opts: { signal: AbortSignal }) => {
+ capturedPort = value.abortPort;
+ hardKillSignal = opts.signal;
+ // Resolve only once piscina is asked to terminate (hard kill).
+ return new Promise((resolve) =>
+ opts.signal.addEventListener("abort", () => resolve({ type: "completed", result: "killed" })),
+ );
+ },
+ );
+
+ const controller = new AbortController();
+ const runPromise = gracePool.run(jobData, controller.signal);
+
+ const message = new Promise((resolve) =>
+ capturedPort!.once("message", (m: AbortReasonMessage) => resolve(m)),
+ );
+
+ controller.abort(new JobTimeout(5000));
+
+ expect(await message).toEqual({ kind: "timeout", timeoutMs: 5000 });
+ expect(hardKillSignal!.aborted).toBe(false);
+
+ // Grace elapses -> piscina is asked to terminate the worker.
+ await vi.advanceTimersByTimeAsync(1000);
+ expect(hardKillSignal!.aborted).toBe(true);
+
+ await runPromise;
+ } finally {
+ vi.useRealTimers();
+ }
+ },
+ );
+
sidequestTest("should call pool.destroy", () => {
pool.destroy();
expect(piscinaMockInstance.destroy).toHaveBeenCalled();
diff --git a/packages/engine/src/shared-runner/runner-pool.ts b/packages/engine/src/shared-runner/runner-pool.ts
index ab17d3fe..1ed8b20b 100644
--- a/packages/engine/src/shared-runner/runner-pool.ts
+++ b/packages/engine/src/shared-runner/runner-pool.ts
@@ -1,13 +1,14 @@
-import { JobData, JobResult, logger } from "@sidequest/core";
-import EventEmitter from "events";
+import { JobData, JobResult, logger, serializeAbortReason } from "@sidequest/core";
+import { MessageChannel } from "node:worker_threads";
import Piscina from "piscina";
import { DEFAULT_RUNNER_PATH } from "../constants";
import { NonNullableEngineConfig } from "../engine";
+import { JobRunner } from "./job-runner";
/**
* A pool of worker threads for running jobs in parallel using Piscina.
*/
-export class RunnerPool {
+export class RunnerPool implements JobRunner {
/** The underlying Piscina worker pool. */
private readonly pool: Piscina;
@@ -29,13 +30,53 @@ export class RunnerPool {
/**
* Runs a job in the worker pool.
+ *
+ * With `abortGracePeriodMs === 0` (default), an abort terminates the worker immediately. With a
+ * positive grace period, the abort is delivered to the job cooperatively over a transferred port
+ * (so it can stop via `this.abortSignal`), and the worker is only forcibly terminated if it has
+ * not finished within the grace period.
+ *
* @param job The job data to run.
- * @param signal Optional event emitter for cancellation.
+ * @param signal Optional abort signal for cancellation/timeout.
* @returns A promise resolving to the job result.
*/
- run(job: JobData, signal?: EventEmitter): Promise {
+ run(job: JobData, signal?: AbortSignal): Promise {
logger("RunnerPool").debug(`Running job ${job.id} in pool`);
- return this.pool.run({ jobData: job, config: this.nonNullConfig }, { signal });
+
+ // Already aborted before we could start (e.g. canceled between claim and dispatch): don't run it.
+ if (signal?.aborted) {
+ return Promise.reject(signal.reason instanceof Error ? signal.reason : new Error("Job aborted before execution"));
+ }
+
+ const grace = this.nonNullConfig.abortGracePeriodMs;
+
+ if (!signal || grace <= 0) {
+ // Abort terminates the worker immediately.
+ return this.pool.run({ jobData: job, config: this.nonNullConfig }, { signal });
+ }
+
+ // Deliver the abort cooperatively first, then hard-terminate after the grace period.
+ const channel = new MessageChannel();
+ const hardKill = new AbortController();
+ let graceTimer: ReturnType | undefined;
+
+ const onAbort = () => {
+ channel.port1.postMessage(serializeAbortReason(signal.reason));
+ graceTimer = setTimeout(() => hardKill.abort(), grace);
+ };
+
+ signal.addEventListener("abort", onAbort, { once: true });
+
+ return this.pool
+ .run(
+ { jobData: job, config: this.nonNullConfig, abortPort: channel.port2 },
+ { transferList: [channel.port2], signal: hardKill.signal },
+ )
+ .finally(() => {
+ if (graceTimer) clearTimeout(graceTimer);
+ signal.removeEventListener("abort", onAbort);
+ channel.port1.close();
+ });
}
/**
diff --git a/packages/engine/src/shared-runner/runner.test.ts b/packages/engine/src/shared-runner/runner.test.ts
index 555cd270..38378c8f 100644
--- a/packages/engine/src/shared-runner/runner.test.ts
+++ b/packages/engine/src/shared-runner/runner.test.ts
@@ -1,9 +1,11 @@
import { sidequestTest, SidequestTestFixture } from "@/tests/fixture";
-import { FailedResult, JobData } from "@sidequest/core";
+import { FailedResult, JobCanceled, JobData, JobTimeout } from "@sidequest/core";
import { existsSync, unlinkSync } from "node:fs";
import { unlink, writeFile } from "node:fs/promises";
import { join, resolve } from "node:path";
+import { MessageChannel } from "node:worker_threads";
import { vi } from "vitest";
+import { AbortAwareJob } from "../test-jobs/abort-aware-job";
import { DummyJob } from "../test-jobs/dummy-job";
import { DummyJobWithArgs } from "../test-jobs/dummy-job-with-args";
import { importSidequest } from "../utils/import";
@@ -52,6 +54,108 @@ describe("runner.ts", () => {
expect(result).toEqual({ __is_job_transition__: true, type: "completed", result: "dummy job" });
});
+ sidequestTest("injects the config when not inline", async ({ config }) => {
+ vi.mocked(importSidequest).mockClear();
+ await run({ jobData, config });
+ expect(importSidequest).toHaveBeenCalled();
+ });
+
+ sidequestTest("skips config injection when inline", async ({ config }) => {
+ vi.mocked(importSidequest).mockClear();
+ const result = await run({ jobData, config, inline: true });
+ expect(result).toEqual({ __is_job_transition__: true, type: "completed", result: "dummy job" });
+ expect(importSidequest).not.toHaveBeenCalled();
+ });
+
+ sidequestTest("injects the abort signal into the job when provided", async ({ config }) => {
+ const injectSpy = vi.spyOn(DummyJob.prototype, "injectAbortSignal");
+ const signal = new AbortController().signal;
+ await run({ jobData, config, signal });
+ expect(injectSpy).toHaveBeenCalledWith(signal);
+ injectSpy.mockRestore();
+ });
+
+ sidequestTest("does not inject an abort signal when none is provided", async ({ config }) => {
+ const injectSpy = vi.spyOn(DummyJob.prototype, "injectAbortSignal");
+ await run({ jobData, config });
+ expect(injectSpy).not.toHaveBeenCalled();
+ injectSpy.mockRestore();
+ });
+
+ sidequestTest("a job receives the abort signal and its reason and stops", async ({ backend, config }) => {
+ const job = new AbortAwareJob();
+ await job.ready();
+ const abortJobData = await backend.createNewJob({
+ class: job.className,
+ script: job.script,
+ args: [],
+ attempt: 0,
+ queue: "default",
+ constructor_args: [],
+ state: "waiting",
+ });
+
+ const controller = new AbortController();
+ const resultPromise = run({ jobData: abortJobData, config, inline: true, signal: controller.signal });
+ controller.abort(new JobCanceled());
+
+ // The job resolves once it observes the abort (whether before it starts or while awaiting it).
+ const result = (await resultPromise) as FailedResult;
+ expect(result.type).toBe("failed");
+ expect(result.error.message).toContain("JobCanceled");
+ });
+
+ sidequestTest("a job sees an already-aborted signal before it starts", async ({ backend, config }) => {
+ const job = new AbortAwareJob();
+ await job.ready();
+ const abortJobData = await backend.createNewJob({
+ class: job.className,
+ script: job.script,
+ args: [],
+ attempt: 0,
+ queue: "default",
+ constructor_args: [],
+ state: "waiting",
+ });
+
+ const controller = new AbortController();
+ controller.abort(new JobTimeout(50));
+
+ const result = (await run({
+ jobData: abortJobData,
+ config,
+ inline: true,
+ signal: controller.signal,
+ })) as FailedResult;
+ expect(result.type).toBe("failed");
+ expect(result.error.message).toContain("aborted before start: JobTimeout");
+ });
+
+ sidequestTest("a thread job is aborted via the abort port (rebuilds the reason)", async ({ backend, config }) => {
+ const job = new AbortAwareJob();
+ await job.ready();
+ const abortJobData = await backend.createNewJob({
+ class: job.className,
+ script: job.script,
+ args: [],
+ attempt: 0,
+ queue: "default",
+ constructor_args: [],
+ state: "waiting",
+ });
+
+ const channel = new MessageChannel();
+ // Thread path: no live signal, the abort arrives over the transferred port.
+ const resultPromise = run({ jobData: abortJobData, config, abortPort: channel.port2 });
+ channel.port1.postMessage({ kind: "timeout", timeoutMs: 1234 });
+
+ const result = (await resultPromise) as FailedResult;
+ expect(result.type).toBe("failed");
+ // The worker reconstructs the JobTimeout reason from the port message.
+ expect(result.error.message).toContain("JobTimeout");
+ channel.port1.close();
+ });
+
sidequestTest("fails with invalid script", async ({ config }) => {
jobData.script = "invalid!";
const result = (await run({ jobData, config })) as FailedResult;
diff --git a/packages/engine/src/shared-runner/runner.ts b/packages/engine/src/shared-runner/runner.ts
index 8e31abde..65219224 100644
--- a/packages/engine/src/shared-runner/runner.ts
+++ b/packages/engine/src/shared-runner/runner.ts
@@ -1,18 +1,66 @@
-import { Job, JobClassType, JobData, JobResult, logger, resolveScriptPathForJob, toErrorData } from "@sidequest/core";
+import {
+ AbortReasonMessage,
+ deserializeAbortReason,
+ Job,
+ JobClassType,
+ JobData,
+ JobResult,
+ logger,
+ resolveScriptPathForJob,
+ toErrorData,
+} from "@sidequest/core";
import { existsSync } from "node:fs";
import { fileURLToPath } from "node:url";
+import { MessagePort } from "node:worker_threads";
import { EngineConfig } from "../engine";
import { importSidequest } from "../utils";
import { findSidequestJobsScriptInParentDirs, MANUAL_SCRIPT_TAG, resolveScriptPath } from "./manual-loader";
+/**
+ * Builds an {@link AbortSignal} for a worker-thread job from an abort port.
+ *
+ * The thread runner cannot receive a live `AbortSignal` across the worker boundary, so the engine
+ * transfers a {@link MessagePort} and posts an abort message on it; this turns that message into a
+ * local signal carrying the proper `JobTimeout`/`JobCanceled` reason.
+ */
+function signalFromAbortPort(port: MessagePort): AbortSignal {
+ const controller = new AbortController();
+ port.on("message", (message: AbortReasonMessage) => {
+ controller.abort(deserializeAbortReason(message));
+ });
+ return controller.signal;
+}
+
/**
* Runs a job by dynamically importing its script and executing the specified class.
- * @param jobData The job data containing script and class information
+ * @param jobData The job data containing script and class information.
* @param config The non-nullable engine configuration.
+ * @param inline Whether the job runs inline in the host process. When true, the Sidequest config is
+ * not re-injected (the host process is already configured).
+ * @param signal Abort signal handed to the job as `this.abortSignal` (used by the inline runner,
+ * which executes in the same process).
+ * @param abortPort Port the thread runner uses to receive the abort cooperatively across the worker
+ * boundary; it is turned into the job's `this.abortSignal`. Mutually exclusive with `signal`.
* @returns A promise resolving to the job result.
*/
-export default async function run({ jobData, config }: { jobData: JobData; config: EngineConfig }): Promise {
- await injectSidequestConfig(config);
+export default async function run({
+ jobData,
+ config,
+ inline,
+ signal,
+ abortPort,
+}: {
+ jobData: JobData;
+ config: EngineConfig;
+ inline?: boolean;
+ signal?: AbortSignal;
+ abortPort?: MessagePort;
+}): Promise {
+ // In inline mode the job runs in the host process, where Sidequest is already configured, so
+ // re-injecting the config is redundant. In a worker thread the module is fresh and needs it.
+ if (!inline) {
+ await injectSidequestConfig(config);
+ }
let script: Record = {};
try {
@@ -65,9 +113,24 @@ export default async function run({ jobData, config }: { jobData: JobData; confi
const job: Job = new JobClass(...jobData.constructor_args);
job.injectJobData(jobData);
+ // Exactly one of these is provided: inline passes a live signal; the thread runner passes an abort
+ // port it turns into one.
+ let abortSignal: AbortSignal | undefined;
+ if (signal) {
+ abortSignal = signal;
+ } else if (abortPort) {
+ abortSignal = signalFromAbortPort(abortPort);
+ }
+ if (abortSignal) {
+ job.injectAbortSignal(abortSignal);
+ }
logger("Runner").debug(`Executing job class "${jobData.class}" with args:`, jobData.args);
- return job.perform(...jobData.args);
+ try {
+ return await job.perform(...jobData.args);
+ } finally {
+ abortPort?.close();
+ }
}
/**
diff --git a/packages/engine/src/test-jobs/abort-aware-job.js b/packages/engine/src/test-jobs/abort-aware-job.js
new file mode 100644
index 00000000..bc9704de
--- /dev/null
+++ b/packages/engine/src/test-jobs/abort-aware-job.js
@@ -0,0 +1,19 @@
+import { Job } from "@sidequest/core";
+
+/**
+ * A job that honors `this.abortSignal`: it waits until aborted and reports the abort reason.
+ * Used to verify cooperative cancellation/timeout end-to-end through the runner.
+ */
+export class AbortAwareJob extends Job {
+ async run() {
+ if (this.abortSignal.aborted) {
+ return this.fail(`aborted before start: ${this.abortSignal.reason?.name}`);
+ }
+
+ await new Promise((resolve) => {
+ this.abortSignal.addEventListener("abort", resolve, { once: true });
+ });
+
+ return this.fail(`aborted: ${this.abortSignal.reason?.name}`);
+ }
+}
diff --git a/packages/engine/src/workers/index.ts b/packages/engine/src/workers/index.ts
index d66d2cdb..9a327bf0 100644
--- a/packages/engine/src/workers/index.ts
+++ b/packages/engine/src/workers/index.ts
@@ -1 +1,2 @@
export * from "./main";
+export * from "./worker-runtime";
diff --git a/packages/engine/src/workers/main.test.ts b/packages/engine/src/workers/main.test.ts
index a5dedb5d..b193e525 100644
--- a/packages/engine/src/workers/main.test.ts
+++ b/packages/engine/src/workers/main.test.ts
@@ -4,8 +4,6 @@ import { randomUUID } from "node:crypto";
import { beforeEach, describe, expect, vi } from "vitest";
import { Dispatcher } from "../execution/dispatcher";
import { grantQueueConfig } from "../queue";
-import { cleanupFinishedJobs } from "../routines/cleanup-finished-job";
-import { releaseStaleJobs } from "../routines/release-stale-jobs";
import { MainWorker } from "./main";
const runMock = vi.hoisted(() => vi.fn());
@@ -20,7 +18,7 @@ vi.mock("../shared-runner", () => ({
}));
const cronMocks = vi.hoisted(() => ({
- schedule: vi.fn().mockReturnValue({ execute: vi.fn() }),
+ schedule: vi.fn().mockReturnValue({ execute: vi.fn(), stop: vi.fn() }),
}));
vi.mock("node-cron", () => ({
@@ -60,53 +58,15 @@ describe("main.ts", () => {
}
await worker.runWorker(config);
vi.resetAllMocks();
+ // resetAllMocks clears the return value, so re-establish the scheduled-task shape used by
+ // WorkerRuntime (which calls task.stop() on shutdown).
+ cronMocks.schedule.mockReturnValue({ execute: vi.fn(), stop: vi.fn() });
});
afterEach(async () => {
await worker.shutdown();
});
- describe("startCron", () => {
- sidequestTest("should schedule both cron jobs", async () => {
- await worker.startCron(60, 600_000, 60_000, 60, 0);
-
- expect(cronMocks.schedule).toHaveBeenCalledTimes(2);
- expect(cronMocks.schedule).toHaveBeenCalledWith("*/60 * * * *", expect.any(Function));
- });
-
- sidequestTest("should call releaseStaleJobs when release cron executes", async () => {
- await worker.startCron(60, 600_000, 60_000, 60, 0);
-
- const cronCallback = cronMocks.schedule.mock.calls[0][1] as () => unknown;
-
- await cronCallback();
-
- expect(releaseStaleJobs).toHaveBeenCalledWith(expect.any(Object), 600_000, 60_000);
- });
-
- sidequestTest("should call cleanupFinishedJobs when cleanup cron executes", async () => {
- await worker.startCron(60, 600_000, 60_000, 60, 0);
-
- const cronCallback = cronMocks.schedule.mock.calls[1][1] as () => unknown;
-
- await cronCallback();
-
- expect(cleanupFinishedJobs).toHaveBeenCalledWith(expect.any(Object), 0);
- });
-
- sidequestTest("should handle errors and log them when releaseStaleJobs fails", async () => {
- const error = new Error("fail");
- (releaseStaleJobs as unknown as ReturnType).mockRejectedValueOnce(error);
-
- await worker.startCron(60, 600_000, 60_000, 60, 0);
-
- const cronCallback = cronMocks.schedule.mock.calls[0][1] as () => unknown;
-
- await expect(cronCallback()).resolves.toBeUndefined();
- expect(releaseStaleJobs).toHaveBeenCalled();
- });
- });
-
describe("runWorker", () => {
sidequestTest("should call startCron after starting the worker", async ({ config }) => {
const mockWorkerRun = vi.fn().mockResolvedValueOnce(undefined);
diff --git a/packages/engine/src/workers/main.ts b/packages/engine/src/workers/main.ts
index 85353b63..c0ea6347 100644
--- a/packages/engine/src/workers/main.ts
+++ b/packages/engine/src/workers/main.ts
@@ -1,20 +1,13 @@
-import { Backend } from "@sidequest/backend";
import { logger } from "@sidequest/core";
-import cron from "node-cron";
import { WORKER_PROCESS_FLAG } from "../constants";
import { Engine, EngineConfig, NonNullableEngineConfig } from "../engine";
-import { Dispatcher } from "../execution/dispatcher";
-import { ExecutorManager } from "../execution/executor-manager";
-import { QueueManager } from "../execution/queue-manager";
-import { cleanupFinishedJobs } from "../routines/cleanup-finished-job";
-import { releaseStaleJobs } from "../routines/release-stale-jobs";
import { gracefulShutdown } from "../utils/shutdown";
+import { WorkerRuntime } from "./worker-runtime";
export class MainWorker {
shuttingDown = false;
- private dispatcher: Dispatcher | undefined;
+ private runtime?: WorkerRuntime;
private engine = new Engine();
- private backend?: Backend;
/**
* Starts a Sidequest worker process with the given configuration.
@@ -24,23 +17,8 @@ export class MainWorker {
if (!this.shuttingDown) {
try {
const nonNullConfig = await this.engine.configure({ ...sidequestConfig, skipMigration: true });
- this.backend = this.engine.getBackend()!;
-
- this.dispatcher = new Dispatcher(
- this.backend,
- new QueueManager(this.backend, nonNullConfig.queues, nonNullConfig.queueDefaults),
- new ExecutorManager(this.backend, nonNullConfig),
- nonNullConfig.jobPollingInterval,
- );
- this.dispatcher.start();
-
- await this.startCron(
- nonNullConfig.releaseStaleJobsIntervalMin,
- nonNullConfig.releaseStaleJobsMaxStaleMs,
- nonNullConfig.releaseStaleJobsMaxClaimedMs,
- nonNullConfig.cleanupFinishedJobsIntervalMin,
- nonNullConfig.cleanupFinishedJobsOlderThan,
- );
+ this.runtime = new WorkerRuntime(this.engine.getBackend()!, nonNullConfig);
+ await this.runtime.start();
} catch (error) {
logger("Worker").error(error);
process.exit(1);
@@ -56,91 +34,13 @@ export class MainWorker {
async shutdown() {
if (!this.shuttingDown) {
this.shuttingDown = true;
- logger("Worker").debug("Shutting down dispatcher");
- await this.dispatcher?.stop();
+ logger("Worker").debug("Shutting down worker runtime");
+ await this.runtime?.shutdown();
logger("Worker").debug("Shutting down engine");
await this.engine.close();
logger("Worker").debug("Main worker completely shut down");
}
}
-
- /**
- * Starts cron job for releasing stale jobs.
- * Also executes the task immediately.
- */
- async startAndExecuteStaleJobsReleaseCron(
- intervalMin: number,
- maxStaleMs: number,
- maxClaimedMs: number,
- ): Promise {
- if (!this.backend) {
- throw new Error("Backend is not initialized. Cannot start stale jobs release cron.");
- }
-
- logger("Worker").debug(`Starting stale jobs release cron with interval: ${intervalMin} minutes`);
- const releaseTask = cron.schedule(`*/${intervalMin} * * * *`, async () => {
- try {
- logger("Worker").debug("Running stale jobs release task");
- await releaseStaleJobs(this.backend!, maxStaleMs, maxClaimedMs);
- } catch (error: unknown) {
- logger("Worker").error("Error on running ReleaseStaleJob!", error);
- }
- });
- return releaseTask.execute();
- }
-
- /**
- * Starts cron job for cleaning up finished jobs.
- * Also executes the task immediately.
- */
- async startAndExecuteFinishedJobsCleanupCron(intervalMin: number, cutoffMs: number): Promise {
- if (!this.backend) {
- throw new Error("Backend is not initialized. Cannot start finished jobs cleanup cron.");
- }
-
- logger("Worker").debug(`Starting finished jobs cleanup cron with interval: ${intervalMin} minutes`);
- const cleanupTask = cron.schedule(`*/${intervalMin} * * * *`, async () => {
- try {
- logger("Worker").debug("Running finished jobs cleanup task");
- await cleanupFinishedJobs(this.backend!, cutoffMs);
- } catch (error: unknown) {
- logger("Worker").error("Error on running CleanupJob!", error);
- }
- });
- return cleanupTask.execute();
- }
-
- /**
- * Starts cron jobs for releasing stale jobs and cleaning up finished jobs.
- *
- * @param staleIntervalMin Interval in minutes for releasing stale jobs, or false to disable.
- * @param maxStaleMs Maximum age in milliseconds for stale jobs.
- * @param maxClaimedMs Maximum age in milliseconds for claimed jobs.
- * @param cleanupIntervalMin Interval in minutes for cleaning up finished jobs, or false to disable
- * @param cleanupCutoffMs Maximum age in milliseconds for finished jobs to be cleaned up.
- */
- async startCron(
- staleIntervalMin: number | false,
- maxStaleMs: number,
- maxClaimedMs: number,
- cleanupIntervalMin: number | false,
- cleanupCutoffMs: number,
- ) {
- logger("Worker").debug("Starting cron jobs");
- const promises: Promise[] = [];
-
- if (staleIntervalMin !== false) {
- promises.push(this.startAndExecuteStaleJobsReleaseCron(staleIntervalMin, maxStaleMs, maxClaimedMs));
- }
-
- if (cleanupIntervalMin !== false) {
- promises.push(this.startAndExecuteFinishedJobsCleanupCron(cleanupIntervalMin, cleanupCutoffMs));
- }
-
- await Promise.all(promises).catch((error) => {
- logger("Worker").error(error);
- });
- }
}
// Gate the bootstrap on the explicit flag the engine passes when forking, not on `!!process.send`.
diff --git a/packages/engine/src/workers/worker-runtime.test.ts b/packages/engine/src/workers/worker-runtime.test.ts
new file mode 100644
index 00000000..15765b0c
--- /dev/null
+++ b/packages/engine/src/workers/worker-runtime.test.ts
@@ -0,0 +1,119 @@
+import { sidequestTest, SidequestTestFixture } from "@/tests/fixture";
+import { beforeEach, describe, expect, vi } from "vitest";
+import { Dispatcher } from "../execution/dispatcher";
+import { cleanupFinishedJobs } from "../routines/cleanup-finished-job";
+import { releaseStaleJobs } from "../routines/release-stale-jobs";
+import { WorkerRuntime } from "./worker-runtime";
+
+const runMock = vi.hoisted(() => vi.fn());
+
+vi.mock("../shared-runner", () => ({
+ RunnerPool: vi.fn(function () {
+ return {
+ run: runMock,
+ destroy: vi.fn(),
+ };
+ }),
+}));
+
+const cronMocks = vi.hoisted(() => ({
+ schedule: vi.fn(),
+}));
+
+vi.mock("node-cron", () => ({
+ default: {
+ schedule: cronMocks.schedule,
+ },
+}));
+
+vi.mock("../routines/cleanup-finished-job", () => ({
+ cleanupFinishedJobs: vi.fn(() => undefined),
+}));
+
+vi.mock("../routines/release-stale-jobs", () => ({
+ releaseStaleJobs: vi.fn(() => undefined),
+}));
+
+describe("WorkerRuntime", () => {
+ let runtime: WorkerRuntime;
+
+ beforeEach(async ({ backend, config }) => {
+ await backend.migrate();
+ vi.clearAllMocks();
+ cronMocks.schedule.mockReturnValue({ execute: vi.fn(), stop: vi.fn() });
+ runtime = new WorkerRuntime(backend, config);
+ });
+
+ describe("start", () => {
+ sidequestTest("starts the dispatcher and schedules both cron routines", async () => {
+ const dispatcherStart = vi.spyOn(Dispatcher.prototype, "start").mockImplementation(() => undefined);
+
+ await runtime.start();
+
+ expect(dispatcherStart).toHaveBeenCalled();
+ expect(cronMocks.schedule).toHaveBeenCalledTimes(2);
+ expect(cronMocks.schedule).toHaveBeenCalledWith("*/60 * * * *", expect.any(Function));
+ });
+ });
+
+ describe("startCron", () => {
+ sidequestTest("runs releaseStaleJobs when the stale cron executes", async ({ config }) => {
+ await runtime.startCron();
+
+ const cronCallback = cronMocks.schedule.mock.calls[0][1] as () => unknown;
+ await cronCallback();
+
+ expect(releaseStaleJobs).toHaveBeenCalledWith(
+ expect.any(Object),
+ config.releaseStaleJobsMaxStaleMs,
+ config.releaseStaleJobsMaxClaimedMs,
+ );
+ });
+
+ sidequestTest("runs cleanupFinishedJobs when the cleanup cron executes", async ({ config }) => {
+ await runtime.startCron();
+
+ const cronCallback = cronMocks.schedule.mock.calls[1][1] as () => unknown;
+ await cronCallback();
+
+ expect(cleanupFinishedJobs).toHaveBeenCalledWith(expect.any(Object), config.cleanupFinishedJobsOlderThan);
+ });
+
+ sidequestTest("does not schedule routines that are disabled", async ({ backend, config }) => {
+ const disabled = new WorkerRuntime(backend, {
+ ...config,
+ releaseStaleJobsIntervalMin: false,
+ cleanupFinishedJobsIntervalMin: false,
+ });
+
+ await disabled.startCron();
+
+ expect(cronMocks.schedule).not.toHaveBeenCalled();
+ });
+
+ sidequestTest("swallows and logs errors thrown by a routine", async () => {
+ (releaseStaleJobs as unknown as ReturnType).mockRejectedValueOnce(new Error("fail"));
+
+ await runtime.startCron();
+
+ const cronCallback = cronMocks.schedule.mock.calls[0][1] as () => unknown;
+ await expect(cronCallback()).resolves.toBeUndefined();
+ expect(releaseStaleJobs).toHaveBeenCalled();
+ });
+ });
+
+ describe("shutdown", () => {
+ sidequestTest("stops the scheduled cron tasks and drains the dispatcher", async () => {
+ const stop = vi.fn();
+ cronMocks.schedule.mockReturnValue({ execute: vi.fn(), stop });
+ vi.spyOn(Dispatcher.prototype, "start").mockImplementation(() => undefined);
+ const dispatcherStop = vi.spyOn(Dispatcher.prototype, "stop").mockResolvedValue(undefined);
+
+ await runtime.start();
+ await runtime.shutdown();
+
+ expect(stop).toHaveBeenCalledTimes(2);
+ expect(dispatcherStop).toHaveBeenCalled();
+ });
+ });
+});
diff --git a/packages/engine/src/workers/worker-runtime.ts b/packages/engine/src/workers/worker-runtime.ts
new file mode 100644
index 00000000..3a061019
--- /dev/null
+++ b/packages/engine/src/workers/worker-runtime.ts
@@ -0,0 +1,108 @@
+import { Backend } from "@sidequest/backend";
+import { logger } from "@sidequest/core";
+import cron, { ScheduledTask } from "node-cron";
+import { NonNullableEngineConfig } from "../engine";
+import { Dispatcher } from "../execution/dispatcher";
+import { ExecutorManager } from "../execution/executor-manager";
+import { QueueManager } from "../execution/queue-manager";
+import { cleanupFinishedJobs } from "../routines/cleanup-finished-job";
+import { releaseStaleJobs } from "../routines/release-stale-jobs";
+
+/**
+ * Owns the runtime side of the engine: the dispatcher loop plus the stale-job and finished-job
+ * cron routines.
+ *
+ * It runs either inside the forked worker process (driven by {@link MainWorker}) or directly in the
+ * host process when the engine is started with `fork: false`. It does NOT own the backend
+ * lifecycle: closing the backend remains the responsibility of the {@link Engine} that created it.
+ */
+export class WorkerRuntime {
+ private dispatcher: Dispatcher;
+ private cronTasks: ScheduledTask[] = [];
+
+ /**
+ * Creates a new WorkerRuntime.
+ * @param backend The backend instance.
+ * @param config The non-nullable engine configuration.
+ */
+ constructor(
+ private backend: Backend,
+ private config: NonNullableEngineConfig,
+ ) {
+ this.dispatcher = new Dispatcher(
+ backend,
+ new QueueManager(backend, config.queues, config.queueDefaults),
+ new ExecutorManager(backend, config),
+ config.jobPollingInterval,
+ );
+ }
+
+ /**
+ * Starts the dispatcher loop and the cron routines.
+ */
+ async start(): Promise {
+ this.dispatcher.start();
+ await this.startCron();
+ }
+
+ /**
+ * Stops the cron routines and drains the dispatcher. Unlike the forked worker (which relies on
+ * process exit), the in-process runtime must explicitly stop its cron tasks to avoid leaks.
+ */
+ async shutdown(): Promise {
+ logger("WorkerRuntime").debug("Stopping cron routines");
+ // ScheduledTask.stop() returns `void | Promise`; normalize before aggregating.
+ await Promise.all(this.cronTasks.map((task) => Promise.resolve(task.stop())));
+ this.cronTasks = [];
+ logger("WorkerRuntime").debug("Stopping dispatcher");
+ await this.dispatcher.stop();
+ }
+
+ /**
+ * Schedules the stale-job and finished-job cron routines according to the config and runs each
+ * once immediately. Either routine can be disabled with a `false` interval.
+ */
+ async startCron(): Promise {
+ const promises: Promise[] = [];
+
+ if (this.config.releaseStaleJobsIntervalMin !== false) {
+ promises.push(
+ this.scheduleAndRun(this.config.releaseStaleJobsIntervalMin, "ReleaseStaleJob", () =>
+ releaseStaleJobs(
+ this.backend,
+ this.config.releaseStaleJobsMaxStaleMs,
+ this.config.releaseStaleJobsMaxClaimedMs,
+ ),
+ ),
+ );
+ }
+
+ if (this.config.cleanupFinishedJobsIntervalMin !== false) {
+ promises.push(
+ this.scheduleAndRun(this.config.cleanupFinishedJobsIntervalMin, "CleanupJob", () =>
+ cleanupFinishedJobs(this.backend, this.config.cleanupFinishedJobsOlderThan),
+ ),
+ );
+ }
+
+ await Promise.all(promises).catch((error) => logger("WorkerRuntime").error(error));
+ }
+
+ /**
+ * Schedules a recurring task at the given minute interval, tracks it for shutdown, and triggers
+ * an immediate run.
+ */
+ private scheduleAndRun(intervalMin: number, name: string, task: () => Promise): Promise {
+ logger("WorkerRuntime").debug(`Starting ${name} cron with interval: ${intervalMin} minutes`);
+ const scheduled = cron.schedule(`*/${intervalMin} * * * *`, async () => {
+ try {
+ logger("WorkerRuntime").debug(`Running ${name} task`);
+ await task();
+ } catch (error: unknown) {
+ logger("WorkerRuntime").error(`Error on running ${name}!`, error);
+ }
+ });
+ this.cronTasks.push(scheduled);
+ return scheduled.execute();
+ }
+}
diff --git a/tests/integration/shared-test-suite.js b/tests/integration/shared-test-suite.js
index 125b6ae6..531d98c5 100644
--- a/tests/integration/shared-test-suite.js
+++ b/tests/integration/shared-test-suite.js
@@ -310,7 +310,9 @@ export function createIntegrationTestSuite(Sidequest, jobs, moduleType = "ESM")
queues: [{ name: "default" }],
});
- const jobData = await Sidequest.build(TimeoutJob).enqueue(1000000);
+ // A few seconds: long enough to reliably cancel while running, short enough that it cannot
+ // outlive the suite's teardown (which truncates the table) if the cancel watcher is mid-poll.
+ const jobData = await Sidequest.build(TimeoutJob).enqueue(3000);
await vi.waitUntil(() => Sidequest.job.get(jobData.id).then((job) => job?.state === "running"), 5000);
// Cancel the job while it's running