Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 1 addition & 7 deletions packages/docs/production/execution-modes.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,6 @@ Use `fork: false` when:
- You're running an **integration test** and want to avoid IPC and process teardown flakiness.
- Your jobs need access to **live, in-process state** that can't cross a process boundary, for example a dependency-injection container.

::: danger No crash isolation with `fork: false`
With the default `fork: true`, a job that throws an unhandled exception or calls `process.exit()` only takes down the engine fork, and Sidequest restarts it. With `fork: false`, the engine shares your application's process: **a misbehaving job can crash your whole app.** Only use it when you understand and accept that.
:::

## `runner`: thread pool vs inline

```typescript
Expand Down Expand Up @@ -159,9 +155,7 @@ this.abortSignal.addEventListener("abort", () => {
| `runner: "thread"`, `abortGracePeriodMs: 0` (default) | No (worker is killed immediately) | Killed right away. |
| `runner: "thread"`, `abortGracePeriodMs > 0` | Yes, for the grace window | Killed after the grace period. |

::: danger Inline timeout/cancel only work if your job honors the signal
In `runner: "inline"` there is no way to forcibly stop a job. If your job does not pass `this.abortSignal` to its async work or check `this.abortSignal.aborted` / `throwIfAborted()`, then **timeouts and cancellation have no effect**: the job keeps running until it returns on its own. Treat `this.abortSignal` as mandatory for any long-running inline job.
:::
So in inline mode `this.abortSignal` is effectively mandatory for any long-running job: a job that does not honor it keeps running until it returns on its own (timeouts and cancellation cannot stop it).

### `abortGracePeriodMs`: graceful kill for thread jobs

Expand Down
25 changes: 0 additions & 25 deletions packages/engine/src/execution/executor-manager.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -236,31 +236,6 @@ describe("ExecutorManager", () => {
await executorManager.destroy();
});

sidequestTest("aborts a running job when its row no longer exists", async ({ backend, config }) => {
await backend.updateJob({ ...jobData, state: "claimed", claimed_at: new Date() });

const queryConfig = await grantQueueConfig(backend, { name: "default", concurrency: 1 });
const executorManager = new ExecutorManager(backend, config);

// Simulate the row being deleted/truncated while the job runs: the watcher must still stop it.
const getJobSpy = vi.spyOn(backend, "getJob").mockResolvedValue(undefined);
runMock.mockImplementationOnce(
(_job: JobData, signal: AbortSignal) =>
new Promise((_, reject) => {
signal.addEventListener("abort", () => reject(new Error("The task has been aborted")));
}),
);

await executorManager.execute(queryConfig, jobData);

// eslint-disable-next-line @typescript-eslint/unbound-method
expect(JobTransitioner.apply).toHaveBeenCalledWith(backend, jobData, expect.any(CancelTransition));
expect(executorManager.totalActiveWorkers()).toBe(0);

await executorManager.destroy();
getJobSpy.mockRestore();
});

sidequestTest("does not crash when the final transition fails (job row gone)", async ({ backend, config }) => {
const queryConfig = await grantQueueConfig(backend, { name: "default", concurrency: 1 });
const executorManager = new ExecutorManager(backend, config);
Expand Down
20 changes: 9 additions & 11 deletions packages/engine/src/execution/executor-manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -115,13 +115,8 @@ export class ExecutorManager {
const cancellationCheck = async () => {
while (isRunning) {
const watchedJob = await this.backend.getJob(job.id);
// Abort when the job was canceled, and also when its row no longer exists (deleted or
// truncated): the record is gone, so the run must be stopped rather than left to block
// shutdown forever.
if (!watchedJob || watchedJob.state === "canceled") {
logger("Executor Manager").debug(
`Aborting job ${job.id}: ${watchedJob ? "canceled" : "row no longer exists"}`,
);
if (watchedJob?.state === "canceled") {
logger("Executor Manager").debug(`Aborting job ${job.id}: canceled`);
controller.abort(new JobCanceled());
isRunning = false;
return;
Expand Down Expand Up @@ -157,12 +152,15 @@ export class ExecutorManager {
isRunning = false;
const err = error as Error;
if (controller.signal.aborted) {
// The run produced no result because the worker was hard-killed (thread). The abort reason
// decides the terminal state: a timeout becomes a retry, anything else (cancellation) becomes
// canceled. The rejection is logged so a real error during the abort is not lost.
// The run produced no result because the worker was hard-killed (thread). Only a clear
// cancellation maps to canceled; every other abort reason (timeout, or anything else) defaults
// to a retry as a failsafe. The rejection is logged so a real error during the abort is kept.
const reason: unknown = controller.signal.reason;
logger("Executor Manager").debug(`Job ${job.id} was hard-killed (${String(reason)}): ${err.message}`);
const transition = reason instanceof JobTimeout ? new RetryTransition(reason) : new CancelTransition();
const transition =
reason instanceof JobCanceled
? new CancelTransition()
: new RetryTransition(reason instanceof Error ? reason : new Error(`Job aborted: ${String(reason)}`));
await this.applyTerminalTransition(job, transition);
} else {
logger("Executor Manager").error(`Unhandled error while executing job ${job.id}: ${err.message}`);
Expand Down
8 changes: 8 additions & 0 deletions packages/engine/src/shared-runner/runner-pool.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,14 @@ describe("RunnerPool", () => {
expect(piscinaMockInstance.run).toHaveBeenCalledWith({ jobData, config }, { signal });
});

sidequestTest("rejects without running the job when the signal is already aborted", async () => {
const controller = new AbortController();
controller.abort(new Error("already gone"));

await expect(pool.run(jobData, controller.signal)).rejects.toThrow("already gone");
expect(piscinaMockInstance.run).not.toHaveBeenCalled();
});

sidequestTest(
"with a grace period, delivers the abort over a port and hard-kills after the grace",
async ({ config }) => {
Expand Down
12 changes: 7 additions & 5 deletions packages/engine/src/shared-runner/runner-pool.ts
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,12 @@ export class RunnerPool implements JobRunner {
*/
run(job: JobData, signal?: AbortSignal): Promise<JobResult> {
logger("RunnerPool").debug(`Running job ${job.id} in pool`);

// Already aborted before we could start (e.g. canceled between claim and dispatch): don't run it.
if (signal?.aborted) {
return Promise.reject(signal.reason instanceof Error ? signal.reason : new Error("Job aborted before execution"));
}

const grace = this.nonNullConfig.abortGracePeriodMs;

if (!signal || grace <= 0) {
Expand All @@ -59,11 +65,7 @@ export class RunnerPool implements JobRunner {
graceTimer = setTimeout(() => hardKill.abort(), grace);
};

if (signal.aborted) {
onAbort();
} else {
signal.addEventListener("abort", onAbort, { once: true });
}
signal.addEventListener("abort", onAbort, { once: true });

return this.pool
.run(
Expand Down
4 changes: 3 additions & 1 deletion tests/integration/shared-test-suite.js
Original file line number Diff line number Diff line change
Expand Up @@ -310,7 +310,9 @@ export function createIntegrationTestSuite(Sidequest, jobs, moduleType = "ESM")
queues: [{ name: "default" }],
});

const jobData = await Sidequest.build(TimeoutJob).enqueue(1000000);
// A few seconds: long enough to reliably cancel while running, short enough that it cannot
// outlive the suite's teardown (which truncates the table) if the cancel watcher is mid-poll.
const jobData = await Sidequest.build(TimeoutJob).enqueue(3000);

await vi.waitUntil(() => Sidequest.job.get(jobData.id).then((job) => job?.state === "running"), 5000);
// Cancel the job while it's running
Expand Down