diff --git a/packages/engine/src/execution/executor-manager.test.ts b/packages/engine/src/execution/executor-manager.test.ts index 0b6d27b8..70a03fb9 100644 --- a/packages/engine/src/execution/executor-manager.test.ts +++ b/packages/engine/src/execution/executor-manager.test.ts @@ -236,6 +236,31 @@ describe("ExecutorManager", () => { await executorManager.destroy(); }); + sidequestTest("aborts a running job when its row no longer exists", async ({ backend, config }) => { + await backend.updateJob({ ...jobData, state: "claimed", claimed_at: new Date() }); + + const queryConfig = await grantQueueConfig(backend, { name: "default", concurrency: 1 }); + const executorManager = new ExecutorManager(backend, config); + + // Simulate the row being deleted/truncated while the job runs: the watcher must still stop it. + const getJobSpy = vi.spyOn(backend, "getJob").mockResolvedValue(undefined); + runMock.mockImplementationOnce( + (_job: JobData, signal: AbortSignal) => + new Promise((_, reject) => { + signal.addEventListener("abort", () => reject(new Error("The task has been aborted"))); + }), + ); + + await executorManager.execute(queryConfig, jobData); + + // eslint-disable-next-line @typescript-eslint/unbound-method + expect(JobTransitioner.apply).toHaveBeenCalledWith(backend, jobData, expect.any(CancelTransition)); + expect(executorManager.totalActiveWorkers()).toBe(0); + + await executorManager.destroy(); + getJobSpy.mockRestore(); + }); + sidequestTest("should abort job execution on timeout", async ({ backend, config }) => { jobData = await backend.updateJob({ ...jobData, state: "claimed", claimed_at: new Date(), timeout: 100 }); diff --git a/packages/engine/src/execution/executor-manager.ts b/packages/engine/src/execution/executor-manager.ts index b06bef68..68253d33 100644 --- a/packages/engine/src/execution/executor-manager.ts +++ b/packages/engine/src/execution/executor-manager.ts @@ -113,11 +113,14 @@ export class ExecutorManager { isRunning = true; const cancellationCheck = async () => { while (isRunning) { - // The row can be missing transiently or if it was deleted; treat that as "not canceled" - // rather than dereferencing undefined and crashing the polling loop. const watchedJob = await this.backend.getJob(job.id); - if (watchedJob?.state === "canceled") { - logger("Executor Manager").debug(`Aborting job ${job.id}: canceled`); + // Abort when the job was canceled, and also when its row no longer exists (deleted or + // truncated): the record is gone, so the run must be stopped rather than left to block + // shutdown forever. + if (!watchedJob || watchedJob.state === "canceled") { + logger("Executor Manager").debug( + `Aborting job ${job.id}: ${watchedJob ? "canceled" : "row no longer exists"}`, + ); controller.abort(new JobCanceled()); isRunning = false; return;