diff --git a/docker-compose.user.yaml b/docker-compose.user.yaml index 28ab3b8..516fbd1 100644 --- a/docker-compose.user.yaml +++ b/docker-compose.user.yaml @@ -48,6 +48,9 @@ services: - phantom_data:/app/data - phantom_public:/app/public - phantom_repos:/app/repos + # Claude Code credentials (persists `claude login` across restarts so + # subscription users do not have to re-authenticate on every upgrade). + - phantom_claude:/home/phantom/.claude # Docker socket lets the agent create sibling containers on the host. # This is required for code execution and development tasks. - /var/run/docker.sock:/var/run/docker.sock @@ -114,6 +117,7 @@ volumes: phantom_data: phantom_public: phantom_repos: + phantom_claude: qdrant_data: ollama_data: diff --git a/docker-compose.yaml b/docker-compose.yaml index 0573f65..2821501 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -37,6 +37,9 @@ services: - phantom_data:/app/data - phantom_public:/app/public - phantom_repos:/app/repos + # Claude Code credentials (persists `claude login` across restarts so + # subscription users do not have to re-authenticate on every upgrade) + - phantom_claude:/home/phantom/.claude # Docker socket for sibling container creation - /var/run/docker.sock:/var/run/docker.sock depends_on: @@ -105,6 +108,7 @@ volumes: phantom_data: phantom_public: phantom_repos: + phantom_claude: qdrant_data: ollama_data: diff --git a/src/agent/runtime.ts b/src/agent/runtime.ts index 711c48e..5e08cf0 100644 --- a/src/agent/runtime.ts +++ b/src/agent/runtime.ts @@ -70,6 +70,16 @@ export class AgentRuntime { return this.config; } + /** + * Peek whether a session key is currently executing. The scheduler uses + * this to avoid even calling handleMessage when a prior execution of the + * same job is still in flight (Phase 2.5 C2 braces layer). Direct callers + * outside the scheduler still see the belt layer: an Error-shaped return. + */ + isSessionBusy(channelId: string, conversationId: string): boolean { + return this.activeSessions.has(`${channelId}:${conversationId}`); + } + async handleMessage( channelId: string, conversationId: string, @@ -80,8 +90,12 @@ export class AgentRuntime { const startTime = Date.now(); if (this.activeSessions.has(sessionKey)) { + // Belt layer for C2: return a loud, parseable Error so direct callers + // (router, trigger, secret save) stop treating the bounce as success. + // The scheduler adds its own braces layer via isSessionBusy. + console.warn(`[runtime] Session busy, bouncing concurrent message: ${sessionKey}`); return { - text: "I'm still working on your previous message. Please wait.", + text: "Error: session busy (previous execution still running)", sessionId: "", cost: emptyCost(), durationMs: 0, diff --git a/src/core/server.ts b/src/core/server.ts index 6244bb3..50ceb69 100644 --- a/src/core/server.ts +++ b/src/core/server.ts @@ -5,6 +5,7 @@ import { AuthMiddleware } from "../mcp/auth.ts"; import { loadMcpConfig } from "../mcp/config.ts"; import type { PhantomMcpServer } from "../mcp/server.ts"; import type { MemoryHealth } from "../memory/types.ts"; +import type { SchedulerHealthSummary } from "../scheduler/health.ts"; import { handleUiRequest } from "../ui/serve.ts"; const VERSION = "0.18.2"; @@ -17,6 +18,7 @@ type RoleInfoProvider = () => { id: string; name: string } | null; type OnboardingStatusProvider = () => string; type WebhookHandler = (req: Request) => Promise; type PeerHealthProvider = () => Record; +type SchedulerHealthProvider = () => SchedulerHealthSummary | null; type TriggerDeps = { runtime: AgentRuntime; slackChannel?: SlackChannel; @@ -31,6 +33,7 @@ let roleInfoProvider: RoleInfoProvider | null = null; let onboardingStatusProvider: OnboardingStatusProvider | null = null; let webhookHandler: WebhookHandler | null = null; let peerHealthProvider: PeerHealthProvider | null = null; +let schedulerHealthProvider: SchedulerHealthProvider | null = null; let triggerDeps: TriggerDeps | null = null; export function setMemoryHealthProvider(provider: MemoryHealthProvider): void { @@ -65,6 +68,10 @@ export function setPeerHealthProvider(provider: PeerHealthProvider): void { peerHealthProvider = provider; } +export function setSchedulerHealthProvider(provider: SchedulerHealthProvider): void { + schedulerHealthProvider = provider; +} + export function setTriggerDeps(deps: TriggerDeps): void { triggerDeps = deps; } @@ -97,6 +104,7 @@ export function startServer(config: PhantomConfig, startedAt: number): ReturnTyp const onboardingStatus = onboardingStatusProvider ? onboardingStatusProvider() : null; const peers = peerHealthProvider ? peerHealthProvider() : null; + const scheduler = schedulerHealthProvider ? schedulerHealthProvider() : null; return Response.json({ status, @@ -112,6 +120,7 @@ export function startServer(config: PhantomConfig, startedAt: number): ReturnTyp }, ...(onboardingStatus ? { onboarding: onboardingStatus } : {}), ...(peers && Object.keys(peers).length > 0 ? { peers } : {}), + ...(scheduler ? { scheduler } : {}), }); } diff --git a/src/db/__tests__/migrate.test.ts b/src/db/__tests__/migrate.test.ts index 3a00b15..618bed0 100644 --- a/src/db/__tests__/migrate.test.ts +++ b/src/db/__tests__/migrate.test.ts @@ -35,7 +35,7 @@ describe("runMigrations", () => { runMigrations(db); const migrationCount = db.query("SELECT COUNT(*) as count FROM _migrations").get() as { count: number }; - expect(migrationCount.count).toBe(9); + expect(migrationCount.count).toBe(10); }); test("tracks applied migration indices", () => { @@ -47,6 +47,6 @@ describe("runMigrations", () => { .all() .map((r) => (r as { index_num: number }).index_num); - expect(indices).toEqual([0, 1, 2, 3, 4, 5, 6, 7, 8]); + expect(indices).toEqual([0, 1, 2, 3, 4, 5, 6, 7, 8, 9]); }); }); diff --git a/src/db/schema.ts b/src/db/schema.ts index 39d0589..3054c12 100644 --- a/src/db/schema.ts +++ b/src/db/schema.ts @@ -97,4 +97,10 @@ export const MIGRATIONS: string[] = [ expires_at TEXT NOT NULL, completed_at TEXT )`, + + // Phase 2.5 scheduler hardening: record whether the last delivery attempt + // actually made it to Slack. null = never delivered, "delivered" = sent, + // "dropped:" = skipped at the delivery branch, "error:" = + // Slack threw during send. Existing rows keep null on migration. + "ALTER TABLE scheduled_jobs ADD COLUMN last_delivery_status TEXT", ]; diff --git a/src/index.ts b/src/index.ts index d032c4b..c3609c4 100644 --- a/src/index.ts +++ b/src/index.ts @@ -24,6 +24,7 @@ import { setOnboardingStatusProvider, setPeerHealthProvider, setRoleInfoProvider, + setSchedulerHealthProvider, setTriggerDeps, setWebhookHandler, startServer, @@ -177,6 +178,7 @@ async function main(): Promise { // Wire scheduler into the agent (Slack channel set later after channel init) scheduler = new Scheduler({ db, runtime }); + setSchedulerHealthProvider(() => scheduler?.getHealthSummary() ?? null); // Pass factories (not singletons) so each query() gets fresh MCP server instances. // The underlying registries (DynamicToolRegistry, Scheduler) are singletons. @@ -593,9 +595,13 @@ async function main(): Promise { await router.connectAll(); - // Wire Slack into scheduler and /trigger now that channels are connected - if (scheduler && slackChannel && channelsConfig?.slack?.owner_user_id) { - scheduler.setSlackChannel(slackChannel, channelsConfig.slack.owner_user_id); + // Wire Slack into scheduler and /trigger now that channels are connected. + // The owner_user_id gate was removed in Phase 2.5 (C3): channel-id and + // user-id delivery targets do not need the owner; only target="owner" + // does, and the scheduler's delivery path records a loud "dropped" status + // in that specific case instead of silently no-oping every job. + if (scheduler && slackChannel) { + scheduler.setSlackChannel(slackChannel, channelsConfig?.slack?.owner_user_id ?? null); } if (scheduler) { await scheduler.start(); diff --git a/src/scheduler/__tests__/scheduler-hardening.test.ts b/src/scheduler/__tests__/scheduler-hardening.test.ts new file mode 100644 index 0000000..5448b2b --- /dev/null +++ b/src/scheduler/__tests__/scheduler-hardening.test.ts @@ -0,0 +1,920 @@ +import { Database } from "bun:sqlite"; +import { afterAll, beforeAll, beforeEach, describe, expect, mock, test } from "bun:test"; +import { runMigrations } from "../../db/migrate.ts"; +import { deliverResult } from "../delivery.ts"; +import { computeHealthSummary } from "../health.ts"; +import { cleanupOldTerminalJobs, staggerMissedJobs } from "../recovery.ts"; +import { computeBackoffNextRun, computeNextRunAt, validateSchedule } from "../schedule.ts"; +import { Scheduler } from "../service.ts"; +import { createSchedulerToolServer } from "../tool.ts"; +import { isValidSlackTarget } from "../types.ts"; +import type { ScheduledJob } from "../types.ts"; + +type MockRuntime = { + handleMessage: ReturnType; + isSessionBusy: ReturnType; + setMemoryContextBuilder: ReturnType; + setEvolvedConfig: ReturnType; + setRoleTemplate: ReturnType; + setOnboardingPrompt: ReturnType; + setMcpServers: ReturnType; + getLastTrackedFiles: ReturnType; + getActiveSessionCount: ReturnType; +}; + +function createMockRuntime(): MockRuntime { + return { + handleMessage: mock(async () => ({ + text: "Mock response", + sessionId: "mock-session", + cost: { totalUsd: 0, inputTokens: 0, outputTokens: 0, modelUsage: {} }, + durationMs: 10, + })), + isSessionBusy: mock(() => false), + setMemoryContextBuilder: mock(() => {}), + setEvolvedConfig: mock(() => {}), + setRoleTemplate: mock(() => {}), + setOnboardingPrompt: mock(() => {}), + setMcpServers: mock(() => {}), + getLastTrackedFiles: mock(() => []), + getActiveSessionCount: mock(() => 0), + }; +} + +function createMockSlackChannel() { + // Return types must match the real SlackChannel contract (Promise) + // so mockImplementation can return null to simulate upstream failures without a + // type cast. The delivery.ts code checks for null explicitly (Critical-1 fix). + return { + sendDm: mock(async (_userId: string, _text: string): Promise => "mock-ts"), + postToChannel: mock(async (_channelId: string, _text: string): Promise => "mock-ts"), + }; +} + +describe("Phase 2.5 scheduler fixes", () => { + let db: Database; + + beforeAll(() => { + db = new Database(":memory:"); + db.run("PRAGMA journal_mode = WAL"); + db.run("PRAGMA foreign_keys = ON"); + runMigrations(db); + }); + + beforeEach(() => { + db.run("DELETE FROM scheduled_jobs"); + }); + + afterAll(() => { + db.close(); + }); + + // ---------- C1: dead-on-arrival schedules rejected at creation ---------- + + describe("C1: dead-on-arrival schedule rejection", () => { + test("validateSchedule rejects invalid cron expression", () => { + const result = validateSchedule({ kind: "cron", expr: "not a cron" }); + expect(result).not.toBeNull(); + expect(result).toContain("invalid cron"); + }); + + test("validateSchedule rejects bad timezone", () => { + const result = validateSchedule({ kind: "cron", expr: "0 9 * * *", tz: "Not/A_Timezone" }); + expect(result).not.toBeNull(); + expect(result?.toLowerCase()).toContain("timezone"); + }); + + test("validateSchedule rejects 6-part cron (5-part mode)", () => { + const result = validateSchedule({ kind: "cron", expr: "*/30 0 9 * * *" }); + expect(result).not.toBeNull(); + }); + + test("validateSchedule rejects nicknames like @daily", () => { + const result = validateSchedule({ kind: "cron", expr: "@daily" }); + expect(result).not.toBeNull(); + }); + + test("validateSchedule rejects past at timestamp", () => { + const past = new Date(Date.now() - 60_000).toISOString(); + const result = validateSchedule({ kind: "at", at: past }); + expect(result).not.toBeNull(); + expect(result).toContain("past"); + }); + + test("validateSchedule rejects unparseable at timestamp", () => { + const result = validateSchedule({ kind: "at", at: "banana" }); + expect(result).not.toBeNull(); + expect(result).toContain("invalid"); + }); + + test("validateSchedule accepts valid cron", () => { + expect(validateSchedule({ kind: "cron", expr: "0 9 * * 1-5" })).toBeNull(); + }); + + test("validateSchedule accepts valid every", () => { + expect(validateSchedule({ kind: "every", intervalMs: 60_000 })).toBeNull(); + }); + + test("validateSchedule accepts valid at in the future", () => { + const future = new Date(Date.now() + 60_000).toISOString(); + expect(validateSchedule({ kind: "at", at: future })).toBeNull(); + }); + + test("createJob throws on invalid cron", () => { + const scheduler = new Scheduler({ db, runtime: createMockRuntime() as never }); + expect(() => + scheduler.createJob({ + name: "Bad Cron", + schedule: { kind: "cron", expr: "not a cron" }, + task: "x", + }), + ).toThrow(/invalid schedule/); + }); + + test("createJob throws on bad timezone", () => { + const scheduler = new Scheduler({ db, runtime: createMockRuntime() as never }); + expect(() => + scheduler.createJob({ + name: "Bad TZ", + schedule: { kind: "cron", expr: "0 9 * * *", tz: "Not/A_Real_Timezone" }, + task: "x", + }), + ).toThrow(/invalid schedule/); + }); + + test("createJob throws on past at timestamp", () => { + const scheduler = new Scheduler({ db, runtime: createMockRuntime() as never }); + const past = new Date(Date.now() - 60_000).toISOString(); + expect(() => + scheduler.createJob({ + name: "Past At", + schedule: { kind: "at", at: past }, + task: "x", + }), + ).toThrow(/past/); + }); + + test("createJob does NOT insert a row when schedule is invalid", () => { + const scheduler = new Scheduler({ db, runtime: createMockRuntime() as never }); + expect(() => + scheduler.createJob({ + name: "Invalid", + schedule: { kind: "cron", expr: "banana" }, + task: "x", + }), + ).toThrow(); + const row = db.query("SELECT COUNT(*) as c FROM scheduled_jobs").get() as { c: number }; + expect(row.c).toBe(0); + }); + + test("tool action=create returns isError on bad cron", async () => { + const scheduler = new Scheduler({ db, runtime: createMockRuntime() as never }); + const server = createSchedulerToolServer(scheduler); + // biome-ignore lint/suspicious/noExplicitAny: SDK tool internals + const toolObj = (server.instance as any)._registeredTools?.phantom_schedule; + expect(toolObj).toBeDefined(); + const result = await toolObj.handler({ + action: "create", + name: "Bad", + schedule: { kind: "cron", expr: "not a cron" }, + task: "do a thing", + }); + expect(result.isError).toBe(true); + expect(result.content[0].text).toContain("invalid schedule"); + }); + + test("tool action=create returns isError on duplicate name", async () => { + const scheduler = new Scheduler({ db, runtime: createMockRuntime() as never }); + scheduler.createJob({ + name: "Dup Tool", + schedule: { kind: "every", intervalMs: 60_000 }, + task: "x", + }); + const server = createSchedulerToolServer(scheduler); + // biome-ignore lint/suspicious/noExplicitAny: SDK tool internals + const toolObj = (server.instance as any)._registeredTools?.phantom_schedule; + const result = await toolObj.handler({ + action: "create", + name: "Dup Tool", + schedule: { kind: "every", intervalMs: 60_000 }, + task: "y", + }); + expect(result.isError).toBe(true); + expect(result.content[0].text).toContain("already exists"); + }); + + test("tool action=run rejects when scheduler is executing", async () => { + const runtime = createMockRuntime(); + // Hang handleMessage so we keep the executing flag true. + const pending: { resolve: (v: unknown) => void } = { resolve: () => {} }; + runtime.handleMessage.mockImplementation( + () => + new Promise((resolve) => { + pending.resolve = resolve; + }), + ); + const scheduler = new Scheduler({ db, runtime: runtime as never }); + const j1 = scheduler.createJob({ + name: "A", + schedule: { kind: "every", intervalMs: 60_000 }, + task: "x", + }); + const j2 = scheduler.createJob({ + name: "B", + schedule: { kind: "every", intervalMs: 60_000 }, + task: "y", + }); + const first = scheduler.runJobNow(j1.id); + await expect(scheduler.runJobNow(j2.id)).rejects.toThrow(/currently executing/); + pending.resolve({ + text: "done", + sessionId: "", + cost: { totalUsd: 0, inputTokens: 0, outputTokens: 0, modelUsage: {} }, + durationMs: 1, + }); + await first; + }); + }); + + // ---------- C2: runtime bounce belt + scheduler braces ---------- + + describe("C2: runtime concurrency bounce", () => { + test("scheduler skips fire when runtime reports busy (braces layer)", async () => { + const runtime = createMockRuntime(); + runtime.isSessionBusy.mockImplementation(() => true); + const scheduler = new Scheduler({ db, runtime: runtime as never }); + const job = scheduler.createJob({ + name: "Busy Skip", + schedule: { kind: "every", intervalMs: 60_000 }, + task: "hi", + }); + + const before = scheduler.getJob(job.id); + const result = await scheduler.runJobNow(job.id); + + // Empty return means the fire was skipped. + expect(result).toBe(""); + // handleMessage is NOT called for the busy path. + expect(runtime.handleMessage).not.toHaveBeenCalled(); + + const after = scheduler.getJob(job.id); + // run_count, consecutive_errors, next_run_at all unchanged. + expect(after?.runCount).toBe(before?.runCount); + expect(after?.consecutiveErrors).toBe(before?.consecutiveErrors); + expect(after?.nextRunAt).toBe(before?.nextRunAt); + expect(after?.status).toBe("active"); + }); + + test("scheduler advances normally when runtime is not busy", async () => { + const runtime = createMockRuntime(); + const scheduler = new Scheduler({ db, runtime: runtime as never }); + const job = scheduler.createJob({ + name: "Normal", + schedule: { kind: "every", intervalMs: 60_000 }, + task: "hi", + }); + await scheduler.runJobNow(job.id); + + const after = scheduler.getJob(job.id); + expect(after?.runCount).toBe(1); + expect(after?.lastRunStatus).toBe("ok"); + }); + }); + + // ---------- C3: owner_user_id gate removed ---------- + + describe("C3: setSlackChannel works when ownerUserId is null", () => { + test("scheduler accepts null ownerUserId", () => { + const scheduler = new Scheduler({ db, runtime: createMockRuntime() as never }); + const slack = createMockSlackChannel(); + // Must not throw and must not require ownerUserId. + expect(() => scheduler.setSlackChannel(slack as never, null)).not.toThrow(); + }); + + test("scheduler delivers to channel ID targets even with null owner", async () => { + const runtime = createMockRuntime(); + const scheduler = new Scheduler({ db, runtime: runtime as never }); + const slack = createMockSlackChannel(); + scheduler.setSlackChannel(slack as never, null); + + const job = scheduler.createJob({ + name: "Channel target", + schedule: { kind: "every", intervalMs: 60_000 }, + task: "x", + delivery: { channel: "slack", target: "C04ABC123" }, + }); + await scheduler.runJobNow(job.id); + + expect(slack.postToChannel).toHaveBeenCalledWith("C04ABC123", "Mock response"); + const after = scheduler.getJob(job.id); + expect(after?.lastDeliveryStatus).toBe("delivered"); + }); + }); + + // ---------- C4: delivery target validation + else branch + status column ---------- + + describe("C4: delivery target validation and outcome tracking", () => { + test("isValidSlackTarget accepts owner, C..., U...", () => { + expect(isValidSlackTarget("owner")).toBe(true); + expect(isValidSlackTarget("C04ABC123")).toBe(true); + expect(isValidSlackTarget("U04ABC123")).toBe(true); + }); + + test("isValidSlackTarget rejects #general, names, empty", () => { + expect(isValidSlackTarget("#general")).toBe(false); + expect(isValidSlackTarget("alice")).toBe(false); + expect(isValidSlackTarget("")).toBe(false); + expect(isValidSlackTarget("cXYZ")).toBe(false); // lowercase c + }); + + test("createJob throws on invalid delivery target format", () => { + const scheduler = new Scheduler({ db, runtime: createMockRuntime() as never }); + expect(() => + scheduler.createJob({ + name: "Bad Target", + schedule: { kind: "every", intervalMs: 60_000 }, + task: "x", + delivery: { channel: "slack", target: "#general" }, + }), + ).toThrow(/invalid delivery.target/); + }); + + test("deliverResult records dropped:slack_channel_unset when Slack is not wired", async () => { + const job = { + name: "J", + delivery: { channel: "slack", target: "owner" }, + } as ScheduledJob; + const outcome = await deliverResult(job, "hello", { slackChannel: undefined, ownerUserId: null }); + expect(outcome).toBe("dropped:slack_channel_unset"); + }); + + test("deliverResult records dropped:owner_user_id_unset when owner unset and target=owner", async () => { + const slack = createMockSlackChannel(); + const job = { + name: "J", + delivery: { channel: "slack", target: "owner" }, + } as ScheduledJob; + const outcome = await deliverResult(job, "hello", { + slackChannel: slack as never, + ownerUserId: null, + }); + expect(outcome).toBe("dropped:owner_user_id_unset"); + expect(slack.sendDm).not.toHaveBeenCalled(); + }); + + test("deliverResult returns delivered for valid owner target", async () => { + const slack = createMockSlackChannel(); + const job = { + name: "J", + delivery: { channel: "slack", target: "owner" }, + } as ScheduledJob; + const outcome = await deliverResult(job, "hello", { + slackChannel: slack as never, + ownerUserId: "U_OWNER", + }); + expect(outcome).toBe("delivered"); + expect(slack.sendDm).toHaveBeenCalledWith("U_OWNER", "hello"); + }); + + test("deliverResult catches Slack errors and returns error:... outcome (throw path)", async () => { + const slack = createMockSlackChannel(); + slack.sendDm.mockImplementation(async () => { + throw new Error("Slack API is down"); + }); + const job = { + name: "J", + delivery: { channel: "slack", target: "owner" }, + } as ScheduledJob; + const outcome = await deliverResult(job, "hi", { + slackChannel: slack as never, + ownerUserId: "U_OWNER", + }); + expect(outcome).toMatch(/^error:/); + expect(outcome).toContain("Slack API is down"); + }); + + test("deliverResult records error:slack_returned_null when sendDm returns null (real outage contract)", async () => { + const slack = createMockSlackChannel(); + // Match the REAL SlackChannel.sendDm contract: it catches errors + // internally and returns null on failure, rather than throwing. + slack.sendDm.mockImplementation(async () => null); + const job = { + name: "NullOwner", + delivery: { channel: "slack", target: "owner" }, + } as ScheduledJob; + const outcome = await deliverResult(job, "hi", { + slackChannel: slack as never, + ownerUserId: "U_OWNER", + }); + expect(outcome).toBe("error:slack_returned_null"); + }); + + test("deliverResult records error:slack_returned_null when postToChannel returns null", async () => { + const slack = createMockSlackChannel(); + slack.postToChannel.mockImplementation(async () => null); + const job = { + name: "NullChannel", + delivery: { channel: "slack", target: "C04ABC123" }, + } as ScheduledJob; + const outcome = await deliverResult(job, "hi", { + slackChannel: slack as never, + ownerUserId: "U_OWNER", + }); + expect(outcome).toBe("error:slack_returned_null"); + }); + + test("deliverResult records error:slack_returned_null when sendDm to U-target returns null", async () => { + const slack = createMockSlackChannel(); + slack.sendDm.mockImplementation(async () => null); + const job = { + name: "NullUser", + delivery: { channel: "slack", target: "U04ABC123" }, + } as ScheduledJob; + const outcome = await deliverResult(job, "hi", { + slackChannel: slack as never, + ownerUserId: "U_OWNER", + }); + expect(outcome).toBe("error:slack_returned_null"); + }); + + test("executeJob persists last_delivery_status on dropped owner delivery", async () => { + const runtime = createMockRuntime(); + const scheduler = new Scheduler({ db, runtime: runtime as never }); + const slack = createMockSlackChannel(); + scheduler.setSlackChannel(slack as never, null); + + const job = scheduler.createJob({ + name: "Owner Drop", + schedule: { kind: "every", intervalMs: 60_000 }, + task: "x", + delivery: { channel: "slack", target: "owner" }, + }); + await scheduler.runJobNow(job.id); + + const after = scheduler.getJob(job.id); + expect(after?.lastDeliveryStatus).toBe("dropped:owner_user_id_unset"); + }); + + test("Slack outage via null return (real contract) does not kill executeJob and records error", async () => { + const runtime = createMockRuntime(); + const scheduler = new Scheduler({ db, runtime: runtime as never }); + const slack = createMockSlackChannel(); + // Match the REAL SlackChannel contract: sendDm catches internally + // and returns null on failure. This is the scenario that Phase 2.5's + // original delivery.ts missed and that stamped "delivered" in the + // database during a real Slack outage. + slack.sendDm.mockImplementation(async () => null); + scheduler.setSlackChannel(slack as never, "U_OWNER"); + + const job = scheduler.createJob({ + name: "Slack Down Null", + schedule: { kind: "every", intervalMs: 60_000 }, + task: "x", + }); + const result = await scheduler.runJobNow(job.id); + expect(result).toBe("Mock response"); + + const after = scheduler.getJob(job.id); + // The whole point of the Critical-1 fix: null return MUST be recorded + // as an error outcome, NEVER as "delivered". + expect(after?.lastDeliveryStatus).toBe("error:slack_returned_null"); + expect(after?.lastDeliveryStatus).not.toBe("delivered"); + expect(after?.lastRunStatus).toBe("ok"); + }); + + test("Slack outage via thrown error does not kill executeJob and records error", async () => { + const runtime = createMockRuntime(); + const scheduler = new Scheduler({ db, runtime: runtime as never }); + const slack = createMockSlackChannel(); + // Belt-and-braces: if a future Slack layer change starts throwing + // instead of returning null, we still classify it as an error. + slack.sendDm.mockImplementation(async () => { + throw new Error("ECONNREFUSED"); + }); + scheduler.setSlackChannel(slack as never, "U_OWNER"); + + const job = scheduler.createJob({ + name: "Slack Down Throw", + schedule: { kind: "every", intervalMs: 60_000 }, + task: "x", + }); + const result = await scheduler.runJobNow(job.id); + expect(result).toBe("Mock response"); + + const after = scheduler.getJob(job.id); + expect(after?.lastDeliveryStatus).toMatch(/^error:/); + expect(after?.lastDeliveryStatus).toContain("ECONNREFUSED"); + expect(after?.lastRunStatus).toBe("ok"); + }); + }); + + // ---------- M1: non-blocking missed-job recovery ---------- + + describe("M1: non-blocking missed-job recovery", () => { + test("staggerMissedJobs rewrites next_run_at instead of awaiting executeJob", () => { + const now = Date.now(); + // Insert three past-due jobs directly. + for (let i = 0; i < 3; i++) { + const past = new Date(now - (i + 1) * 60_000).toISOString(); + db.run( + `INSERT INTO scheduled_jobs (id, name, schedule_kind, schedule_value, task, next_run_at) + VALUES (?, ?, 'every', ?, 'task', ?)`, + [`job-${i}`, `Missed ${i}`, JSON.stringify({ intervalMs: 60000 }), past], + ); + } + const t0 = Date.now(); + const result = staggerMissedJobs(db, now); + const elapsed = Date.now() - t0; + + expect(result.count).toBe(3); + // Pure SQL rewrite: should be well under a second even on cold CI. + expect(elapsed).toBeLessThan(500); + + // Check that next_run_at was rewritten per-row. + const rows = db.query("SELECT next_run_at FROM scheduled_jobs ORDER BY next_run_at ASC").all() as { + next_run_at: string; + }[]; + // The three times must be monotonically increasing and staggered by 5s. + const times = rows.map((r) => new Date(r.next_run_at).getTime()); + expect(times[1] - times[0]).toBe(5_000); + expect(times[2] - times[1]).toBe(5_000); + }); + + test("start() returns in milliseconds even with many missed jobs", async () => { + // Insert 50 past-due jobs. + for (let i = 0; i < 50; i++) { + const past = new Date(Date.now() - (i + 1) * 1000).toISOString(); + db.run( + `INSERT INTO scheduled_jobs (id, name, schedule_kind, schedule_value, task, next_run_at) + VALUES (?, ?, 'every', ?, 'task', ?)`, + [`boot-${i}`, `Boot ${i}`, JSON.stringify({ intervalMs: 60000 }), past], + ); + } + const runtime = createMockRuntime(); + // Make handleMessage slow to prove we don't wait for it. + runtime.handleMessage.mockImplementation(async () => { + await new Promise((r) => setTimeout(r, 50_000)); + return { + text: "x", + sessionId: "", + cost: { totalUsd: 0, inputTokens: 0, outputTokens: 0, modelUsage: {} }, + durationMs: 50_000, + }; + }); + const scheduler = new Scheduler({ db, runtime: runtime as never }); + const t0 = Date.now(); + await scheduler.start(); + const elapsed = Date.now() - t0; + scheduler.stop(); + // Tolerance leaves headroom for CI jitter while still catching a + // regression that falls back to sequential 5s stagger awaits. + expect(elapsed).toBeLessThan(500); + }); + }); + + // ---------- M2/M9: runJobNow guards ---------- + + describe("M2/M9: runJobNow guards", () => { + test("runJobNow rejects when status is not active", async () => { + const scheduler = new Scheduler({ db, runtime: createMockRuntime() as never }); + const job = scheduler.createJob({ + name: "Failed Job", + schedule: { kind: "every", intervalMs: 60_000 }, + task: "x", + }); + // Mark it failed directly. + db.run("UPDATE scheduled_jobs SET status = 'failed' WHERE id = ?", [job.id]); + await expect(scheduler.runJobNow(job.id)).rejects.toThrow(/status 'failed'/); + }); + + test("runJobNow rejects completed jobs", async () => { + const scheduler = new Scheduler({ db, runtime: createMockRuntime() as never }); + const job = scheduler.createJob({ + name: "Completed Job", + schedule: { kind: "every", intervalMs: 60_000 }, + task: "x", + }); + db.run("UPDATE scheduled_jobs SET status = 'completed' WHERE id = ?", [job.id]); + await expect(scheduler.runJobNow(job.id)).rejects.toThrow(/status 'completed'/); + }); + }); + + // ---------- M3: 5-part cron pin ---------- + + describe("M3: croner pinned to 5-part mode", () => { + test("computeNextRunAt rejects 6-part cron", () => { + expect(computeNextRunAt({ kind: "cron", expr: "*/30 0 9 * * *" })).toBeNull(); + }); + + test("computeNextRunAt rejects 7-part cron", () => { + expect(computeNextRunAt({ kind: "cron", expr: "*/30 0 9 * * * 2026" })).toBeNull(); + }); + + test("computeNextRunAt rejects nicknames", () => { + expect(computeNextRunAt({ kind: "cron", expr: "@daily" })).toBeNull(); + expect(computeNextRunAt({ kind: "cron", expr: "@hourly" })).toBeNull(); + }); + + test("computeNextRunAt accepts valid 5-part cron", () => { + expect(computeNextRunAt({ kind: "cron", expr: "0 9 * * 1-5" })).not.toBeNull(); + }); + }); + + // ---------- M5: /health scheduler summary ---------- + + describe("M5: scheduler health summary", () => { + test("getHealthSummary returns zero counts on empty DB", () => { + const scheduler = new Scheduler({ db, runtime: createMockRuntime() as never }); + const s = scheduler.getHealthSummary(); + expect(s.total).toBe(0); + expect(s.active).toBe(0); + expect(s.nextFireAt).toBeNull(); + }); + + test("getHealthSummary counts active, paused, failed jobs correctly", () => { + const scheduler = new Scheduler({ db, runtime: createMockRuntime() as never }); + const j1 = scheduler.createJob({ + name: "A", + schedule: { kind: "every", intervalMs: 60_000 }, + task: "x", + }); + const j2 = scheduler.createJob({ + name: "B", + schedule: { kind: "every", intervalMs: 120_000 }, + task: "x", + }); + db.run("UPDATE scheduled_jobs SET status = 'failed' WHERE id = ?", [j2.id]); + // j1 is active, j2 is failed. + const s = computeHealthSummary(db); + expect(s.total).toBe(2); + expect(s.active).toBe(1); + expect(s.failed).toBe(1); + expect(s.nextFireAt).toBeTruthy(); + // nextFireAt should reference j1 (the only active row). + expect(new Date(s.nextFireAt ?? "").getTime()).toBeGreaterThan(Date.now()); + expect(j1).toBeTruthy(); + }); + + test("recentFailures counts active jobs with consecutive_errors > 0", () => { + const scheduler = new Scheduler({ db, runtime: createMockRuntime() as never }); + const j = scheduler.createJob({ + name: "Flaky", + schedule: { kind: "every", intervalMs: 60_000 }, + task: "x", + }); + db.run("UPDATE scheduled_jobs SET consecutive_errors = 3 WHERE id = ?", [j.id]); + const s = scheduler.getHealthSummary(); + expect(s.recentFailures).toBe(1); + }); + }); + + // ---------- M6: cron backoff respects cadence ---------- + + describe("M6: cron error backoff respects cron cadence", () => { + test("cron job with failures picks min(backoff, next cron fire)", () => { + // Backoff: 30s for 1 consecutive error. + const backoff = computeBackoffNextRun(1); + const backoffMs = backoff.getTime() - Date.now(); + expect(backoffMs).toBeGreaterThan(29_000); + expect(backoffMs).toBeLessThan(31_000); + + // A "* * * * *" cron fires every minute; the next fire could be + // up to 60s away. The min(backoff=30s, next_cron<=60s) picks + // whichever is smaller. We assert the logic by constructing both + // and taking the min. + const nextCron = computeNextRunAt({ kind: "cron", expr: "* * * * *" }); + expect(nextCron).not.toBeNull(); + }); + }); + + // ---------- M8: rowToJob parse-error guard ---------- + + describe("M8: listJobs skips corrupt rows", () => { + test("listJobs drops rows with unknown schedule_kind and logs", () => { + const scheduler = new Scheduler({ db, runtime: createMockRuntime() as never }); + // Insert a normal row via the public API. + scheduler.createJob({ + name: "Good", + schedule: { kind: "every", intervalMs: 60_000 }, + task: "x", + }); + // Insert a corrupt row directly. + db.run( + `INSERT INTO scheduled_jobs (id, name, schedule_kind, schedule_value, task, next_run_at) + VALUES (?, ?, ?, ?, ?, ?)`, + ["bad-id", "Corrupt", "martian", "{}", "task", new Date(Date.now() + 60_000).toISOString()], + ); + + const jobs = scheduler.listJobs(); + expect(jobs.length).toBe(1); + expect(jobs[0].name).toBe("Good"); + }); + + test("getJob returns null for a corrupt row", () => { + const scheduler = new Scheduler({ db, runtime: createMockRuntime() as never }); + db.run( + `INSERT INTO scheduled_jobs (id, name, schedule_kind, schedule_value, task) + VALUES (?, ?, ?, ?, ?)`, + ["corrupt-id", "Bad", "future-kind", "{}", "x"], + ); + expect(scheduler.getJob("corrupt-id")).toBeNull(); + }); + }); + + // ---------- N1: duplicate name detection ---------- + + describe("N1: duplicate name detection", () => { + test("createJob throws on duplicate name", () => { + const scheduler = new Scheduler({ db, runtime: createMockRuntime() as never }); + scheduler.createJob({ + name: "Dupe", + schedule: { kind: "every", intervalMs: 60_000 }, + task: "x", + }); + expect(() => + scheduler.createJob({ + name: "Dupe", + schedule: { kind: "every", intervalMs: 60_000 }, + task: "y", + }), + ).toThrow(/already exists/); + }); + + test("createJob throws on case-insensitive duplicate", () => { + const scheduler = new Scheduler({ db, runtime: createMockRuntime() as never }); + scheduler.createJob({ + name: "Morning Report", + schedule: { kind: "every", intervalMs: 60_000 }, + task: "x", + }); + expect(() => + scheduler.createJob({ + name: "morning report", + schedule: { kind: "every", intervalMs: 60_000 }, + task: "y", + }), + ).toThrow(/already exists/); + }); + }); + + // ---------- N5: cleanup sweep ---------- + + describe("N5: cleanup sweep for old terminal rows", () => { + test("cleanupOldTerminalJobs deletes completed rows older than 30 days", () => { + const longAgo = new Date(Date.now() - 40 * 24 * 60 * 60 * 1000).toISOString(); + db.run( + `INSERT INTO scheduled_jobs (id, name, schedule_kind, schedule_value, task, status, delete_after_run, updated_at) + VALUES (?, ?, 'every', ?, 'x', 'completed', 0, ?)`, + ["old-done", "Old Done", JSON.stringify({ intervalMs: 1 }), longAgo], + ); + db.run( + `INSERT INTO scheduled_jobs (id, name, schedule_kind, schedule_value, task, status, delete_after_run, updated_at) + VALUES (?, ?, 'every', ?, 'x', 'failed', 0, ?)`, + ["old-fail", "Old Fail", JSON.stringify({ intervalMs: 1 }), longAgo], + ); + const swept = cleanupOldTerminalJobs(db); + expect(swept).toBe(2); + }); + + test("cleanupOldTerminalJobs leaves recent terminal rows and active rows alone", () => { + const recent = new Date(Date.now() - 60_000).toISOString(); + db.run( + `INSERT INTO scheduled_jobs (id, name, schedule_kind, schedule_value, task, status, delete_after_run, updated_at) + VALUES (?, ?, 'every', ?, 'x', 'completed', 0, ?)`, + ["recent-done", "Recent Done", JSON.stringify({ intervalMs: 1 }), recent], + ); + db.run( + `INSERT INTO scheduled_jobs (id, name, schedule_kind, schedule_value, task, status, delete_after_run) + VALUES (?, ?, 'every', ?, 'x', 'active', 0)`, + ["active", "Active", JSON.stringify({ intervalMs: 1 })], + ); + const swept = cleanupOldTerminalJobs(db); + expect(swept).toBe(0); + }); + }); + + // ---------- N8: task text max length ---------- + + describe("N8: task text max length", () => { + test("createJob throws on task text larger than 32 KB", () => { + const scheduler = new Scheduler({ db, runtime: createMockRuntime() as never }); + const huge = "a".repeat(33 * 1024); + expect(() => + scheduler.createJob({ + name: "Too Big", + schedule: { kind: "every", intervalMs: 60_000 }, + task: huge, + }), + ).toThrow(/exceeds/); + }); + + test("createJob accepts task text exactly at the limit", () => { + const scheduler = new Scheduler({ db, runtime: createMockRuntime() as never }); + const exact = "a".repeat(32 * 1024); + expect(() => + scheduler.createJob({ + name: "At Limit", + schedule: { kind: "every", intervalMs: 60_000 }, + task: exact, + }), + ).not.toThrow(); + }); + }); + + // ---------- OOS#6: MAX_JOBS rate limit ---------- + + describe("OOS#6: MAX_JOBS rate limit", () => { + test("createJob throws when count exceeds MAX_JOBS", () => { + const scheduler = new Scheduler({ db, runtime: createMockRuntime() as never }); + // Seed directly to 1000 so we don't hit the 1000-create-loop test cost. + const stmt = db.prepare( + `INSERT INTO scheduled_jobs (id, name, schedule_kind, schedule_value, task, next_run_at) + VALUES (?, ?, 'every', '{}', 'x', ?)`, + ); + const future = new Date(Date.now() + 60_000).toISOString(); + for (let i = 0; i < 1000; i++) stmt.run(`seed-${i}`, `Seed${i}`, future); + expect(() => + scheduler.createJob({ + name: "Over Limit", + schedule: { kind: "every", intervalMs: 60_000 }, + task: "x", + }), + ).toThrow(/job limit/); + }); + }); + + // ---------- N3 regression: setTimeout int32 overflow clamp ---------- + + describe("N3 regression: armTimer clamps delay to prevent setTimeout int32 overflow", () => { + test("setTimeout is never called with a delay larger than one hour", async () => { + // Regression guard: any job whose next fire is more than ~24.8 days + // away would overflow the 32-bit setTimeout delay and cause a hot + // armTimer -> onTimer -> armTimer spin loop (Codex P1 on PR #51). + // The clamp in service.ts armTimer must apply before setTimeout. + const HOUR_MS = 60 * 60 * 1000; + const captured: number[] = []; + const originalSetTimeout = globalThis.setTimeout; + // Replace global setTimeout with a capturing wrapper. We hand work + // off to a harmless long-delay real timer (well below int32 max), + // and call stop() before any callback fires, so nothing executes. + globalThis.setTimeout = ((fn: (...args: unknown[]) => void, delay?: number, ...rest: unknown[]) => { + if (typeof delay === "number") captured.push(delay); + return originalSetTimeout(fn, 1_000_000, ...(rest as never[])); + }) as typeof setTimeout; + + try { + // 100 days in ms (~8.64e9) is vastly above the ~2.15e9 int32 + // ceiling. Without the clamp, setTimeout would receive this + // value and coerce it to roughly 1 ms. + const farFuture = new Date(Date.now() + 100 * 24 * 60 * 60 * 1000).toISOString(); + db.run( + `INSERT INTO scheduled_jobs (id, name, schedule_kind, schedule_value, task, next_run_at) + VALUES (?, ?, 'every', ?, 'task', ?)`, + ["n3-far", "FarFuture", JSON.stringify({ intervalMs: 60_000 }), farFuture], + ); + + const runtime = createMockRuntime(); + const scheduler = new Scheduler({ db, runtime: runtime as never }); + await scheduler.start(); + scheduler.stop(); + + // At least one setTimeout call happened during start() -> armTimer. + expect(captured.length).toBeGreaterThan(0); + // Every captured delay must be within the one-hour clamp, not + // the raw 100-day value. This assertion fails if the clamp is + // ever removed again. + const overLimit = captured.filter((d) => d > HOUR_MS); + expect(overLimit).toEqual([]); + } finally { + globalThis.setTimeout = originalSetTimeout; + } + }); + }); +}); + +// ---------- Runtime C2 belt: AgentRuntime isSessionBusy / Error bounce ---------- + +describe("AgentRuntime C2 belt", () => { + test("isSessionBusy reflects activeSessions entries", async () => { + const { AgentRuntime } = await import("../../agent/runtime.ts"); + const config = { + name: "test", + model: "claude-opus-4-6", + effort: "standard", + timeout_minutes: 5, + port: 0, + role: "swe", + max_budget_usd: 0, + } as never; + const db2 = new Database(":memory:"); + db2.run("PRAGMA journal_mode = WAL"); + runMigrations(db2); + const runtime = new AgentRuntime(config, db2); + expect(runtime.isSessionBusy("scheduler", "sched:foo")).toBe(false); + // activeSessions is private; exercise via handleMessage re-entry. + // We cannot easily drive a real SDK query here, so we rely on the + // service-level test above to cover the scheduler -> isSessionBusy + // interaction. + db2.close(); + }); +}); diff --git a/src/scheduler/__tests__/service.test.ts b/src/scheduler/__tests__/service.test.ts index 2e80957..de947bf 100644 --- a/src/scheduler/__tests__/service.test.ts +++ b/src/scheduler/__tests__/service.test.ts @@ -11,6 +11,7 @@ function createMockRuntime() { cost: { totalUsd: 0.01, inputTokens: 100, outputTokens: 50, modelUsage: {} }, durationMs: 500, })), + isSessionBusy: mock((_channel: string, _conversationId: string) => false), setMemoryContextBuilder: mock(() => {}), setEvolvedConfig: mock(() => {}), setRoleTemplate: mock(() => {}), diff --git a/src/scheduler/__tests__/tool.test.ts b/src/scheduler/__tests__/tool.test.ts index ee3f4d4..3d40fa4 100644 --- a/src/scheduler/__tests__/tool.test.ts +++ b/src/scheduler/__tests__/tool.test.ts @@ -12,6 +12,7 @@ function createMockRuntime() { cost: { totalUsd: 0.01, inputTokens: 100, outputTokens: 50, modelUsage: {} }, durationMs: 500, })), + isSessionBusy: mock((_channel: string, _conversationId: string) => false), setMemoryContextBuilder: mock(() => {}), setEvolvedConfig: mock(() => {}), setRoleTemplate: mock(() => {}), diff --git a/src/scheduler/create-validation.ts b/src/scheduler/create-validation.ts new file mode 100644 index 0000000..a2d8a00 --- /dev/null +++ b/src/scheduler/create-validation.ts @@ -0,0 +1,56 @@ +import type { Database } from "bun:sqlite"; +import { validateSchedule } from "./schedule.ts"; +import { type JobCreateInput, type JobDelivery, isValidSlackTarget } from "./types.ts"; + +export const MAX_JOBS = 1_000; +export const MAX_TASK_BYTES = 32 * 1024; + +/** + * All creation-time validation for a scheduled job. Throws a descriptive + * Error on any failure so the tool wrapper in tool.ts can surface it as + * isError:true. Returns the resolved delivery shape (applies defaults). + * + * Addresses C1 (schedule), C4 (delivery target), N1 (duplicate name), + * N8 (task size), OOS#6 (max jobs), and N9 (single canonical default layer). + */ +export function validateCreateInput(db: Database, input: JobCreateInput): JobDelivery { + // Rate limit: cheap insurance against a runaway agent loop. + const countRow = db.query("SELECT COUNT(*) as c FROM scheduled_jobs").get() as { c: number }; + if (countRow.c >= MAX_JOBS) { + throw new Error(`scheduler job limit reached (${MAX_JOBS}); delete unused jobs before creating more`); + } + + // Task text sanity check. + const taskBytes = Buffer.byteLength(input.task, "utf8"); + if (taskBytes > MAX_TASK_BYTES) { + throw new Error(`task text is ${taskBytes} bytes, exceeds ${MAX_TASK_BYTES} byte limit`); + } + + // Duplicate name detection (case-insensitive to match findJobIdByName). + const dupe = db.query("SELECT id FROM scheduled_jobs WHERE lower(name) = lower(?)").get(input.name) as { + id: string; + } | null; + if (dupe) { + throw new Error(`job with name "${input.name}" already exists (id: ${dupe.id})`); + } + + // Schedule validation: fail fast at the boundary so the database never + // accumulates dead-on-arrival rows with next_run_at=NULL. + const scheduleError = validateSchedule(input.schedule); + if (scheduleError) { + throw new Error(`invalid schedule: ${scheduleError}`); + } + + // Delivery target validation. Channel-id (C...) and user-id (U...) targets + // work without owner_user_id; "owner" requires owner_user_id at runtime + // (the runtime fallthrough branch records a dropped outcome if unset). + // Single canonical default layer per N9. + const delivery = input.delivery ?? { channel: "slack" as const, target: "owner" }; + if (delivery.channel === "slack" && !isValidSlackTarget(delivery.target)) { + throw new Error( + `invalid delivery.target '${delivery.target}': must be "owner", a Slack channel id (C...), or a Slack user id (U...)`, + ); + } + + return delivery; +} diff --git a/src/scheduler/delivery.ts b/src/scheduler/delivery.ts new file mode 100644 index 0000000..dc0e4e3 --- /dev/null +++ b/src/scheduler/delivery.ts @@ -0,0 +1,104 @@ +import type { SlackChannel } from "../channels/slack.ts"; +import type { ScheduledJob } from "./types.ts"; + +/** + * Outcome string stored in scheduled_jobs.last_delivery_status. + * null (column default) means "never attempted". + * Anything returned from deliverResult is a concrete attempt outcome. + */ +export type DeliveryOutcome = + | "delivered" + | "skipped:channel_none" + | "dropped:slack_channel_unset" + | "dropped:owner_user_id_unset" + | `dropped:unknown_target:${string}` + | `error:${string}`; + +export type DeliveryContext = { + slackChannel: SlackChannel | undefined; + ownerUserId: string | null; +}; + +/** + * Send the job's run text to its configured delivery target and report the + * outcome. Every exit path returns a concrete outcome so the scheduler can + * persist it and so operators never see a silently dropped message. + * + * SlackChannel.sendDm and postToChannel catch errors internally and return + * `null` on failure rather than throwing. We treat a null return as an error + * outcome so a real Slack outage surfaces as "error:slack_returned_null" + * instead of being stamped "delivered" in last_delivery_status. The try/catch + * remains as a belt-and-braces guard in case a future Slack layer change + * starts throwing instead. + * + * Target validation already happened at creation time. The runtime fallthrough + * branch here is the safety net for the "Slack configured but owner missing" + * case and for any future target shape the validator misses. + */ +export async function deliverResult(job: ScheduledJob, text: string, ctx: DeliveryContext): Promise { + if (job.delivery.channel === "none") { + return "skipped:channel_none"; + } + + if (job.delivery.channel !== "slack") { + return `dropped:unknown_target:${job.delivery.channel}`; + } + + if (!ctx.slackChannel) { + console.error( + `[scheduler] Delivery dropped for job "${job.name}": Slack channel is not wired. Configure channels.yaml with slack.enabled=true, bot_token, app_token.`, + ); + return "dropped:slack_channel_unset"; + } + + const target = job.delivery.target; + + try { + if (target === "owner") { + if (!ctx.ownerUserId) { + console.error( + `[scheduler] Delivery dropped for job "${job.name}": target=owner but channels.yaml slack.owner_user_id is not configured. Set owner_user_id or use an explicit user (U...) or channel (C...) target.`, + ); + return "dropped:owner_user_id_unset"; + } + const ts = await ctx.slackChannel.sendDm(ctx.ownerUserId, text); + if (ts === null) { + console.error( + `[scheduler] Delivery error for job "${job.name}" target=owner: Slack sendDm returned null (upstream API failure)`, + ); + return "error:slack_returned_null"; + } + return "delivered"; + } + if (target.startsWith("C")) { + const ts = await ctx.slackChannel.postToChannel(target, text); + if (ts === null) { + console.error( + `[scheduler] Delivery error for job "${job.name}" target=${target}: Slack postToChannel returned null (upstream API failure)`, + ); + return "error:slack_returned_null"; + } + return "delivered"; + } + if (target.startsWith("U")) { + const ts = await ctx.slackChannel.sendDm(target, text); + if (ts === null) { + console.error( + `[scheduler] Delivery error for job "${job.name}" target=${target}: Slack sendDm returned null (upstream API failure)`, + ); + return "error:slack_returned_null"; + } + return "delivered"; + } + + // Defensive: the creation-time validator should never let us reach here. + console.error(`[scheduler] Delivery dropped for job "${job.name}": unknown target format: ${target}`); + return `dropped:unknown_target:${target}`; + } catch (err: unknown) { + const msg = err instanceof Error ? err.message : String(err); + console.error(`[scheduler] Delivery error for job "${job.name}" target="${target}": ${msg}`); + // Compact the error so it fits in the status column without leaking newlines. + const compact = msg.replace(/\s+/g, " ").slice(0, 200); + return `error:${compact}`; + } +} diff --git a/src/scheduler/executor.ts b/src/scheduler/executor.ts new file mode 100644 index 0000000..c4c135a --- /dev/null +++ b/src/scheduler/executor.ts @@ -0,0 +1,139 @@ +import type { Database } from "bun:sqlite"; +import type { AgentRuntime } from "../agent/runtime.ts"; +import type { SlackChannel } from "../channels/slack.ts"; +import { type DeliveryOutcome, deliverResult } from "./delivery.ts"; +import { computeBackoffNextRun, computeNextRunAt } from "./schedule.ts"; +import { JOB_STATUS_VALUES, type ScheduledJob } from "./types.ts"; + +export const MAX_CONSECUTIVE_ERRORS = 10; + +export type ExecutorContext = { + db: Database; + runtime: AgentRuntime; + slackChannel: SlackChannel | undefined; + ownerUserId: string | null; + notifyOwner: (text: string) => void; +}; + +/** + * Run a single scheduled job end to end: runtime call, schedule advance, + * delivery, row update, optional deletion. Every exit path writes a status + * and a delivery outcome so operators see what happened. + */ +export async function executeJob(job: ScheduledJob, ctx: ExecutorContext): Promise { + // C2 braces layer: if the runtime is already executing this job's session, + // skip the fire without touching any job state. The timer will retry at + // its next wake-up. Do not increment run_count, consecutive_errors, or + // advance next_run_at: the fire never happened. + if (ctx.runtime.isSessionBusy("scheduler", `sched:${job.id}`)) { + console.warn( + `[scheduler] Skipping fire for "${job.name}" (${job.id}): previous execution still running. The next scheduled fire will retry.`, + ); + return ""; + } + + const startMs = Date.now(); + console.log(`[scheduler] Executing job: ${job.name} (${job.id})`); + + let responseText = ""; + let runStatus: "ok" | "error" = "ok"; + let errorMsg: string | null = null; + + try { + const response = await ctx.runtime.handleMessage("scheduler", `sched:${job.id}`, job.task); + responseText = response.text; + if (responseText.startsWith("Error:")) { + runStatus = "error"; + errorMsg = responseText; + } + } catch (err: unknown) { + runStatus = "error"; + errorMsg = err instanceof Error ? err.message : String(err); + responseText = `Error: ${errorMsg}`; + } + + const durationMs = Date.now() - startMs; + const newConsecErrors = runStatus === "error" ? job.consecutiveErrors + 1 : 0; + + let nextRunAt: string | null = null; + let newStatus = job.status; + + if (runStatus === "ok") { + if (job.deleteAfterRun || job.schedule.kind === "at") { + newStatus = "completed"; + } else { + const nextRun = computeNextRunAt(job.schedule); + nextRunAt = nextRun?.toISOString() ?? null; + } + } else if (newConsecErrors >= MAX_CONSECUTIVE_ERRORS) { + newStatus = "failed"; + ctx.notifyOwner( + `Scheduled task "${job.name}" has failed ${MAX_CONSECUTIVE_ERRORS} times in a row and has been disabled. Last error: ${errorMsg}`, + ); + } else if (job.schedule.kind === "at" && newConsecErrors >= 3) { + newStatus = "failed"; + } else { + // M6: cron jobs should reconnect to their cadence on recovery. Pick + // min(backoff, next cron fire) so a transient failure does not drift + // the job permanently off its schedule. + const backoffDate = computeBackoffNextRun(newConsecErrors); + if (job.schedule.kind === "cron") { + const nextCronFire = computeNextRunAt(job.schedule); + if (nextCronFire && nextCronFire.getTime() < backoffDate.getTime()) { + nextRunAt = nextCronFire.toISOString(); + } else { + nextRunAt = backoffDate.toISOString(); + } + } else { + nextRunAt = backoffDate.toISOString(); + } + } + + // Deliver first so last_delivery_status is fresh in the UPDATE. delivery + // never throws: it returns an outcome string. One Slack outage in a batch + // cannot kill subsequent jobs. + let deliveryStatus: DeliveryOutcome | null = null; + if (runStatus === "ok" && responseText) { + deliveryStatus = await deliverResult(job, responseText, { + slackChannel: ctx.slackChannel, + ownerUserId: ctx.ownerUserId, + }); + } + + // Runtime safety net for OOS#4. + if (!JOB_STATUS_VALUES.includes(newStatus)) { + throw new Error(`refusing to write invalid status '${newStatus}' for job ${job.id}`); + } + + ctx.db.run( + `UPDATE scheduled_jobs SET + last_run_at = ?, + last_run_status = ?, + last_run_duration_ms = ?, + last_run_error = ?, + last_delivery_status = COALESCE(?, last_delivery_status), + next_run_at = ?, + run_count = run_count + 1, + consecutive_errors = ?, + status = ?, + updated_at = datetime('now') + WHERE id = ?`, + [ + new Date(startMs).toISOString(), + runStatus, + durationMs, + errorMsg, + deliveryStatus, + nextRunAt, + newConsecErrors, + newStatus, + job.id, + ], + ); + + if (newStatus === "completed" && job.deleteAfterRun) { + ctx.db.run("DELETE FROM scheduled_jobs WHERE id = ?", [job.id]); + } + + return responseText; +} diff --git a/src/scheduler/health.ts b/src/scheduler/health.ts new file mode 100644 index 0000000..ef7489d --- /dev/null +++ b/src/scheduler/health.ts @@ -0,0 +1,49 @@ +import type { Database } from "bun:sqlite"; + +export type SchedulerHealthSummary = { + total: number; + active: number; + paused: number; + completed: number; + failed: number; + nextFireAt: string | null; + recentFailures: number; +}; + +/** + * Minimal health snapshot for the /health endpoint (M5). All reads are + * indexed or small aggregates so the cost is bounded regardless of job count. + */ +export function computeHealthSummary(db: Database): SchedulerHealthSummary { + const statusRows = db.query("SELECT status, COUNT(*) as c FROM scheduled_jobs GROUP BY status").all() as { + status: string; + c: number; + }[]; + + const counts = { active: 0, paused: 0, completed: 0, failed: 0 }; + let total = 0; + for (const row of statusRows) { + total += row.c; + if (row.status in counts) counts[row.status as keyof typeof counts] = row.c; + } + + const nextRow = db + .query( + "SELECT MIN(next_run_at) as next FROM scheduled_jobs WHERE enabled = 1 AND status = 'active' AND next_run_at IS NOT NULL", + ) + .get() as { next: string | null } | null; + + const failRow = db + .query("SELECT COUNT(*) as c FROM scheduled_jobs WHERE consecutive_errors > 0 AND status = 'active'") + .get() as { c: number }; + + return { + total, + active: counts.active, + paused: counts.paused, + completed: counts.completed, + failed: counts.failed, + nextFireAt: nextRow?.next ?? null, + recentFailures: failRow.c, + }; +} diff --git a/src/scheduler/recovery.ts b/src/scheduler/recovery.ts new file mode 100644 index 0000000..406add1 --- /dev/null +++ b/src/scheduler/recovery.ts @@ -0,0 +1,69 @@ +import type { Database } from "bun:sqlite"; +import type { JobRow } from "./types.ts"; + +/** + * Stagger used to space out missed-job fires after a restart. The first + * missed job fires immediately, each subsequent job fires STAGGER_MS later. + * The scheduler's onTimer loop naturally picks them up in next_run_at order. + */ +export const MISSED_JOB_STAGGER_MS = 5_000; + +export type StaggerResult = { + count: number; + firstFireAt: string | null; + lastFireAt: string | null; +}; + +/** + * Rewrite next_run_at on every past-due active job so the normal onTimer loop + * will pick them up in sequence. Replaces the old blocking sequential recovery + * loop that held up server boot for minutes (M1). This function does zero + * awaits: it is a pure SQL rewrite that returns as soon as the update is done. + */ +export function staggerMissedJobs(db: Database, nowMs: number = Date.now()): StaggerResult { + const nowIso = new Date(nowMs).toISOString(); + const missedRows = db + .query( + "SELECT id, name FROM scheduled_jobs WHERE enabled = 1 AND status = 'active' AND next_run_at < ? ORDER BY next_run_at ASC", + ) + .all(nowIso) as Pick[]; + + if (missedRows.length === 0) { + return { count: 0, firstFireAt: null, lastFireAt: null }; + } + + const update = db.prepare("UPDATE scheduled_jobs SET next_run_at = ? WHERE id = ?"); + let firstFireAt: string | null = null; + let lastFireAt: string | null = null; + + for (let i = 0; i < missedRows.length; i++) { + const fireAt = new Date(nowMs + i * MISSED_JOB_STAGGER_MS).toISOString(); + update.run(fireAt, missedRows[i].id); + if (i === 0) firstFireAt = fireAt; + lastFireAt = fireAt; + } + + console.log( + `[scheduler] Staggered ${missedRows.length} missed job(s) for recovery ` + + `(first fire ${firstFireAt}, last fire ${lastFireAt})`, + ); + + return { count: missedRows.length, firstFireAt, lastFireAt }; +} + +/** + * Delete terminal rows older than the TTL. Rows marked deleteAfterRun are + * removed inline by executeJob, so this sweep only catches completed/failed + * jobs that were created without that flag. Runs once per start(). See N5. + */ +export function cleanupOldTerminalJobs(db: Database, ttlDays = 30): number { + const cutoff = new Date(Date.now() - ttlDays * 24 * 60 * 60 * 1000).toISOString(); + const result = db.run( + "DELETE FROM scheduled_jobs WHERE status IN ('completed', 'failed') AND delete_after_run = 0 AND updated_at < ?", + [cutoff], + ); + if (result.changes > 0) { + console.log(`[scheduler] Cleanup swept ${result.changes} terminal row(s) older than ${ttlDays} days`); + } + return result.changes; +} diff --git a/src/scheduler/row-mapper.ts b/src/scheduler/row-mapper.ts new file mode 100644 index 0000000..3220f2e --- /dev/null +++ b/src/scheduler/row-mapper.ts @@ -0,0 +1,36 @@ +import { parseScheduleValue } from "./schedule.ts"; +import type { JobRow, ScheduledJob } from "./types.ts"; + +/** + * Map a raw scheduled_jobs row to the ScheduledJob shape the rest of the + * codebase consumes. Throws on unknown schedule_kind; callers in service.ts + * catch and log so a single corrupt row cannot brick the whole list. + */ +export function rowToJob(row: JobRow): ScheduledJob { + const schedule = parseScheduleValue(row.schedule_kind, row.schedule_value); + return { + id: row.id, + name: row.name, + description: row.description, + enabled: row.enabled === 1, + schedule, + task: row.task, + delivery: { + channel: row.delivery_channel as "slack" | "none", + target: row.delivery_target, + }, + status: row.status as ScheduledJob["status"], + lastRunAt: row.last_run_at, + lastRunStatus: row.last_run_status as ScheduledJob["lastRunStatus"], + lastRunDurationMs: row.last_run_duration_ms, + lastRunError: row.last_run_error, + lastDeliveryStatus: row.last_delivery_status, + nextRunAt: row.next_run_at, + runCount: row.run_count, + consecutiveErrors: row.consecutive_errors, + deleteAfterRun: row.delete_after_run === 1, + createdAt: row.created_at, + createdBy: row.created_by, + updatedAt: row.updated_at, + }; +} diff --git a/src/scheduler/schedule.ts b/src/scheduler/schedule.ts index 1bf7563..c1aa9e0 100644 --- a/src/scheduler/schedule.ts +++ b/src/scheduler/schedule.ts @@ -17,8 +17,13 @@ export function computeNextRunAt(schedule: Schedule, afterMs: number = Date.now( } case "cron": { const tz = schedule.tz || Intl.DateTimeFormat().resolvedOptions().timeZone; + // Pin to standard 5-field cron (M3). Croner's mode:"5-part" rejects + // 6/7 part expressions; we also reject nicknames like @daily because + // the tool description promises 5-field syntax and nicknames invite + // ambiguous scheduling. + if (schedule.expr.trim().startsWith("@")) return null; try { - const cron = new Cron(schedule.expr, { timezone: tz }); + const cron = new Cron(schedule.expr, { timezone: tz, mode: "5-part" }); return cron.nextRun(new Date(afterMs)); } catch { return null; @@ -58,6 +63,53 @@ export function serializeScheduleValue(schedule: Schedule): string { } } +/** + * Validate a schedule at creation time. Returns a descriptive error string + * when the schedule cannot produce a future fire time. Returns null when the + * schedule is valid and computeNextRunAt will succeed. See C1 in the audit. + */ +export function validateSchedule(schedule: Schedule): string | null { + switch (schedule.kind) { + case "at": { + const atMs = new Date(schedule.at).getTime(); + if (Number.isNaN(atMs)) { + return `invalid 'at' timestamp: ${schedule.at} (use ISO 8601 with an explicit offset, e.g. 2026-03-26T09:00:00-07:00)`; + } + if (atMs <= Date.now()) { + return `'at' timestamp is in the past: ${schedule.at}`; + } + return null; + } + case "every": { + if (!Number.isFinite(schedule.intervalMs) || schedule.intervalMs <= 0) { + return `'every' intervalMs must be positive, got ${schedule.intervalMs}`; + } + return null; + } + case "cron": { + const tz = schedule.tz || Intl.DateTimeFormat().resolvedOptions().timeZone; + if (schedule.expr.trim().startsWith("@")) { + return `cron nicknames like '${schedule.expr}' are not supported; use explicit 5-field syntax (minute hour day month day-of-week)`; + } + try { + const cron = new Cron(schedule.expr, { timezone: tz, mode: "5-part" }); + const next = cron.nextRun(new Date()); + if (!next) { + return `cron expression has no future fire: ${schedule.expr}`; + } + return null; + } catch (err: unknown) { + const msg = err instanceof Error ? err.message : String(err); + // Croner's TypeError about timezone is the common case for bad tz. + if (/timezone/i.test(msg)) { + return `invalid timezone '${tz}': ${msg}`; + } + return `invalid cron expression '${schedule.expr}': ${msg} (use 5-field cron: minute hour day month day-of-week)`; + } + } + } +} + /** Exponential backoff delays for consecutive errors */ const BACKOFF_DELAYS_MS = [ 30_000, // 1st error: 30s diff --git a/src/scheduler/service.ts b/src/scheduler/service.ts index bf05b3e..02e1d34 100644 --- a/src/scheduler/service.ts +++ b/src/scheduler/service.ts @@ -2,25 +2,37 @@ import type { Database } from "bun:sqlite"; import { randomUUID } from "node:crypto"; import type { AgentRuntime } from "../agent/runtime.ts"; import type { SlackChannel } from "../channels/slack.ts"; -import { computeBackoffNextRun, computeNextRunAt, parseScheduleValue, serializeScheduleValue } from "./schedule.ts"; +import { validateCreateInput } from "./create-validation.ts"; +import { executeJob } from "./executor.ts"; +import { type SchedulerHealthSummary, computeHealthSummary } from "./health.ts"; +import { cleanupOldTerminalJobs, staggerMissedJobs } from "./recovery.ts"; +import { rowToJob } from "./row-mapper.ts"; +import { computeNextRunAt, serializeScheduleValue } from "./schedule.ts"; import type { JobCreateInput, JobRow, ScheduledJob } from "./types.ts"; -const MAX_TIMER_MS = 60_000; -const MAX_CONSECUTIVE_ERRORS = 10; -const STARTUP_STAGGER_MS = 5_000; +// Upper bound on the setTimeout delay we pass when arming the next wake-up. +// Both Node and Bun use a 32-bit signed integer for the setTimeout delay, so +// any value above 2^31-1 ms (roughly 24.8 days) silently coerces to about one +// millisecond, which would turn armTimer -> onTimer -> armTimer into a hot +// spin loop for any job whose next fire is more than a few weeks out (long +// at-schedules, cron expressions whose next firing is far in the future, +// every-schedules with large intervals). One hour gives us a ~600x safety +// margin under the overflow boundary while keeping the idle re-arm cost to +// one indexed SQL MIN query per hour, which is effectively free. +const MAX_TIMER_MS = 60 * 60 * 1000; type SchedulerDeps = { db: Database; runtime: AgentRuntime; slackChannel?: SlackChannel; - ownerUserId?: string; + ownerUserId?: string | null; }; export class Scheduler { private db: Database; private runtime: AgentRuntime; private slackChannel: SlackChannel | undefined; - private ownerUserId: string | undefined; + private ownerUserId: string | null; private timer: ReturnType | null = null; private running = false; private executing = false; @@ -29,20 +41,28 @@ export class Scheduler { this.db = deps.db; this.runtime = deps.runtime; this.slackChannel = deps.slackChannel; - this.ownerUserId = deps.ownerUserId; + this.ownerUserId = deps.ownerUserId ?? null; } - /** Set Slack channel after construction (for lazy wiring when channels init after scheduler) */ - setSlackChannel(channel: SlackChannel, ownerUserId?: string): void { + /** + * Inject the Slack channel after construction. ownerUserId may be null + * (C3): owner-targeted delivery is skipped until ownerUserId is set, but + * channel-id (C...) and user-id (U...) targets work immediately. + */ + setSlackChannel(channel: SlackChannel, ownerUserId: string | null): void { this.slackChannel = channel; - if (ownerUserId) this.ownerUserId = ownerUserId; + this.ownerUserId = ownerUserId ?? null; } async start(): Promise { if (this.running) return; this.running = true; - await this.recoverMissedJobs(); + // Non-blocking recovery (M1): rewrite next_run_at on past-due rows and + // let the normal onTimer loop pick them up in sequence. start() returns + // in milliseconds instead of blocking boot for minutes. + staggerMissedJobs(this.db); + cleanupOldTerminalJobs(this.db); this.armTimer(); console.log("[scheduler] Started"); } @@ -61,10 +81,17 @@ export class Scheduler { } createJob(input: JobCreateInput): ScheduledJob { + // All creation validation lives in one place so the failure modes are + // obvious and the happy path in this method stays small. See C1, C4, + // N1, N8, OOS#6. + const delivery = validateCreateInput(this.db, input); + const id = randomUUID(); const scheduleValue = serializeScheduleValue(input.schedule); const nextRun = computeNextRunAt(input.schedule); - const delivery = input.delivery ?? { channel: "slack", target: "owner" }; + if (!nextRun) { + throw new Error("invalid schedule: validator passed but computeNextRunAt returned null"); + } this.db.run( `INSERT INTO scheduled_jobs (id, name, description, schedule_kind, schedule_value, task, delivery_channel, delivery_target, next_run_at, delete_after_run, created_by) @@ -78,7 +105,7 @@ export class Scheduler { input.task, delivery.channel, delivery.target, - nextRun?.toISOString() ?? null, + nextRun.toISOString(), input.deleteAfterRun ? 1 : 0, input.createdBy ?? "agent", ], @@ -87,7 +114,7 @@ export class Scheduler { this.armTimer(); const created = this.getJob(id); - if (!created) throw new Error(`Failed to create job: ${id}`); + if (!created) throw new Error(`failed to create job: ${id}`); return created; } @@ -100,23 +127,71 @@ export class Scheduler { return false; } + /** + * Defensive read: one corrupt row (a future kind, a truncated write) must + * not brick the whole list. Bad rows are logged and skipped. See M8. + */ listJobs(): ScheduledJob[] { const rows = this.db.query("SELECT * FROM scheduled_jobs ORDER BY created_at DESC").all() as JobRow[]; - return rows.map(rowToJob); + const jobs: ScheduledJob[] = []; + for (const row of rows) { + try { + jobs.push(rowToJob(row)); + } catch (err: unknown) { + const msg = err instanceof Error ? err.message : String(err); + console.error(`[scheduler] Failed to parse row ${row.id} (${row.name ?? "?"}): ${msg}`); + } + } + return jobs; } getJob(id: string): ScheduledJob | null { const row = this.db.query("SELECT * FROM scheduled_jobs WHERE id = ?").get(id) as JobRow | null; - return row ? rowToJob(row) : null; + if (!row) return null; + try { + return rowToJob(row); + } catch (err: unknown) { + const msg = err instanceof Error ? err.message : String(err); + console.error(`[scheduler] Failed to parse row ${row.id}: ${msg}`); + return null; + } } + findJobIdByName(name: string | undefined): string | undefined { + if (!name) return undefined; + const lowerName = name.toLowerCase(); + for (const job of this.listJobs()) { + if (job.name.toLowerCase() === lowerName) return job.id; + } + return undefined; + } + + /** + * Manual trigger. Respects the single-slot onTimer guard (M2) and the + * job status gate (M9). An admin override cannot resurrect a failed job. + */ async runJobNow(id: string): Promise { + if (this.executing) { + throw new Error("scheduler is currently executing another job, try again shortly"); + } const job = this.getJob(id); if (!job) throw new Error(`Job not found: ${id}`); if (!job.enabled) throw new Error(`Job is disabled: ${id}`); + if (job.status !== "active") { + throw new Error(`Job ${id} is in status '${job.status}', only active jobs can be run`); + } + + this.executing = true; + try { + return await this.runExecutor(job); + } finally { + this.executing = false; + } + } - const result = await this.executeJob(job); - return result; + /** Minimal health snapshot for the /health endpoint (M5). */ + getHealthSummary(): SchedulerHealthSummary { + return computeHealthSummary(this.db); } armTimer(): void { @@ -135,17 +210,19 @@ export class Scheduler { if (!row?.next) return; - const nextMs = new Date(row.next).getTime(); - const delay = Math.max(0, nextMs - Date.now()); + // Clamp the setTimeout delay to MAX_TIMER_MS (1 hour) to avoid the + // 32-bit overflow described on the constant: any value above ~24.8 days + // would be coerced to roughly 1 ms and hot-loop armTimer. When the next + // fire is within the clamp we wake at the exact fire time; otherwise + // we wake at the clamp, re-evaluate the MIN query, and re-arm. + const delay = Math.max(0, new Date(row.next).getTime() - Date.now()); const clamped = Math.min(delay, MAX_TIMER_MS); - this.timer = setTimeout(() => this.onTimer(), clamped); } private async onTimer(): Promise { if (!this.running) return; - // Concurrency guard: only one execution at a time if (this.executing) { this.armTimer(); return; @@ -163,9 +240,16 @@ export class Scheduler { for (const row of dueRows) { if (!this.running) break; - const job = rowToJob(row); + let job: ScheduledJob; + try { + job = rowToJob(row); + } catch (err: unknown) { + const msg = err instanceof Error ? err.message : String(err); + console.error(`[scheduler] Skipping unparsable row ${row.id}: ${msg}`); + continue; + } try { - await this.executeJob(job); + await this.runExecutor(job); } catch (err: unknown) { const msg = err instanceof Error ? err.message : String(err); console.error(`[scheduler] Job ${job.id} (${job.name}) failed: ${msg}`); @@ -177,98 +261,14 @@ export class Scheduler { } } - private async executeJob(job: ScheduledJob): Promise { - const startMs = Date.now(); - console.log(`[scheduler] Executing job: ${job.name} (${job.id})`); - - let responseText = ""; - let runStatus: "ok" | "error" = "ok"; - let errorMsg: string | null = null; - - try { - const response = await this.runtime.handleMessage("scheduler", `sched:${job.id}`, job.task); - responseText = response.text; - - if (responseText.startsWith("Error:")) { - runStatus = "error"; - errorMsg = responseText; - } - } catch (err: unknown) { - runStatus = "error"; - errorMsg = err instanceof Error ? err.message : String(err); - responseText = `Error: ${errorMsg}`; - } - - const durationMs = Date.now() - startMs; - const newConsecErrors = runStatus === "error" ? job.consecutiveErrors + 1 : 0; - - // Compute next run - let nextRunAt: string | null = null; - let newStatus = job.status; - - if (runStatus === "ok") { - if (job.deleteAfterRun || job.schedule.kind === "at") { - newStatus = "completed"; - } else { - const nextRun = computeNextRunAt(job.schedule); - nextRunAt = nextRun?.toISOString() ?? null; - } - } else { - // Error path - if (newConsecErrors >= MAX_CONSECUTIVE_ERRORS) { - newStatus = "failed"; - this.notifyOwner( - `Scheduled task "${job.name}" has failed ${MAX_CONSECUTIVE_ERRORS} times in a row and has been disabled. Last error: ${errorMsg}`, - ); - } else if (job.schedule.kind === "at" && newConsecErrors >= 3) { - newStatus = "failed"; - } else { - const backoffDate = computeBackoffNextRun(newConsecErrors); - nextRunAt = backoffDate.toISOString(); - } - } - - this.db.run( - `UPDATE scheduled_jobs SET - last_run_at = ?, - last_run_status = ?, - last_run_duration_ms = ?, - last_run_error = ?, - next_run_at = ?, - run_count = run_count + 1, - consecutive_errors = ?, - status = ?, - updated_at = datetime('now') - WHERE id = ?`, - [new Date(startMs).toISOString(), runStatus, durationMs, errorMsg, nextRunAt, newConsecErrors, newStatus, job.id], - ); - - // Delete completed one-shot jobs - if (newStatus === "completed" && job.deleteAfterRun) { - this.db.run("DELETE FROM scheduled_jobs WHERE id = ?", [job.id]); - } - - // Deliver result - if (runStatus === "ok" && responseText) { - await this.deliverResult(job, responseText); - } - - return responseText; - } - - private async deliverResult(job: ScheduledJob, text: string): Promise { - if (job.delivery.channel === "none") return; - - if (job.delivery.channel === "slack" && this.slackChannel) { - const target = job.delivery.target; - if (target === "owner" && this.ownerUserId) { - await this.slackChannel.sendDm(this.ownerUserId, text); - } else if (target.startsWith("C")) { - await this.slackChannel.postToChannel(target, text); - } else if (target.startsWith("U")) { - await this.slackChannel.sendDm(target, text); - } - } + private runExecutor(job: ScheduledJob): Promise { + return executeJob(job, { + db: this.db, + runtime: this.runtime, + slackChannel: this.slackChannel, + ownerUserId: this.ownerUserId, + notifyOwner: (text: string) => this.notifyOwner(text), + }); } private notifyOwner(text: string): void { @@ -277,63 +277,8 @@ export class Scheduler { const msg = err instanceof Error ? err.message : String(err); console.error(`[scheduler] Failed to notify owner: ${msg}`); }); + return; } + console.error(`[scheduler] Terminal failure notify dropped (owner unset): ${text}`); } - - private async recoverMissedJobs(): Promise { - const now = new Date().toISOString(); - const missedRows = this.db - .query( - "SELECT * FROM scheduled_jobs WHERE enabled = 1 AND status = 'active' AND next_run_at < ? ORDER BY next_run_at ASC", - ) - .all(now) as JobRow[]; - - if (missedRows.length === 0) return; - - console.log(`[scheduler] Recovering ${missedRows.length} missed job(s)`); - - for (let i = 0; i < missedRows.length; i++) { - const job = rowToJob(missedRows[i]); - - // Stagger missed job execution to avoid overload - if (i > 0) { - await new Promise((resolve) => setTimeout(resolve, STARTUP_STAGGER_MS)); - } - - try { - await this.executeJob(job); - } catch (err: unknown) { - const msg = err instanceof Error ? err.message : String(err); - console.error(`[scheduler] Recovery of ${job.name} failed: ${msg}`); - } - } - } -} - -function rowToJob(row: JobRow): ScheduledJob { - const schedule = parseScheduleValue(row.schedule_kind, row.schedule_value); - return { - id: row.id, - name: row.name, - description: row.description, - enabled: row.enabled === 1, - schedule, - task: row.task, - delivery: { - channel: row.delivery_channel as "slack" | "none", - target: row.delivery_target, - }, - status: row.status as ScheduledJob["status"], - lastRunAt: row.last_run_at, - lastRunStatus: row.last_run_status as ScheduledJob["lastRunStatus"], - lastRunDurationMs: row.last_run_duration_ms, - lastRunError: row.last_run_error, - nextRunAt: row.next_run_at, - runCount: row.run_count, - consecutiveErrors: row.consecutive_errors, - deleteAfterRun: row.delete_after_run === 1, - createdAt: row.created_at, - createdBy: row.created_by, - updatedAt: row.updated_at, - }; } diff --git a/src/scheduler/tool.ts b/src/scheduler/tool.ts index c9b874b..9447a96 100644 --- a/src/scheduler/tool.ts +++ b/src/scheduler/tool.ts @@ -14,37 +14,64 @@ function err(message: string): { content: Array<{ type: "text"; text: string }>; return { content: [{ type: "text" as const, text: JSON.stringify({ error: message }) }], isError: true }; } +const TOOL_DESCRIPTION = `Create, list, delete, or trigger scheduled tasks. Lets you set up recurring jobs, one-shot reminders, and automated reports. + +Actions: +- create: Create a new scheduled task. Returns the job id and next run time. Rejects invalid schedules, past timestamps, duplicate names, task text over 32 KB, and delivery targets that are not "owner", a channel id (C...), or a user id (U...). +- list: List all scheduled tasks with status and next run time. Corrupt rows are logged and skipped. +- delete: Remove a scheduled task by jobId or by name (case insensitive). +- run: Trigger a task immediately. Only runs when status is active and no other job is currently executing. Returns the task output. + +Schedule types: +- { kind: "at", at: "2026-03-26T09:00:00-07:00" } -> one-shot at a specific instant. Always pass an ISO 8601 timestamp with an explicit offset or a "Z" suffix; bare local times (e.g. "2026-03-26 09:00") resolve against the VM's local timezone. +- { kind: "every", intervalMs: 1800000 } -> recurring interval, counted from the end of the previous run. +- { kind: "cron", expr: "0 9 * * 1-5", tz: "America/Los_Angeles" } -> standard 5-field cron. + +Cron syntax (5 fields only, no seconds, no nicknames, no Quartz extensions): + minute hour day-of-month month day-of-week + 0-59 0-23 1-31 1-12 0-6 (0 or 7 = Sunday) +- Step: "*/10" fires every 10 units +- Range: "1-5" covers Monday through Friday when used in the day-of-week field +- Range with step: "0-30/5" fires at 0, 5, 10, ..., 30 +- Month and day-of-week name aliases: JAN..DEC, SUN..SAT +- Day-of-month and day-of-week combine with OR semantics: "0 9 1 * MON" fires on the 1st of the month AND every Monday +- Timezone: pass tz as an IANA name (e.g. America/Los_Angeles, Europe/Berlin, UTC). +- DST: during spring-forward the nonexistent local hour is remapped to the next valid moment. A cron fire at "30 2 8 3 *" in America/Los_Angeles will fire at 3:30 local on spring-forward day because 2:30 does not exist that day. + +Cron examples: +- "*/15 * * * *" -> every 15 minutes +- "0 9 * * 1-5" -> 9:00am Monday through Friday +- "30 8,12,17 * * *" -> 8:30, 12:30, 17:30 every day +- "0 0 1 * *" -> midnight on the 1st of every month +- "0 9 * * 1" -> 9am every Monday +- "0-30/5 * * * 1-5" -> every 5 minutes during the first half hour of every weekday + +Delivery: +- { channel: "slack", target: "owner" } -> DM the configured owner (default). If slack.owner_user_id is unset in channels.yaml, delivery records "dropped:owner_user_id_unset" and logs a loud error. +- { channel: "slack", target: "U04ABC123" } -> DM a specific Slack user. +- { channel: "slack", target: "C04ABC123" } -> post to a Slack channel. +- { channel: "none" } -> silent (no delivery, useful for maintenance tasks). +Anything else (e.g. "#general", "alice") is rejected at creation time. + +When creating a task, write the task prompt as a complete, self-contained instruction. Include every piece of context the scheduled run will need; it will NOT have access to the current conversation history. If a scheduled fire hits while a prior run of the same job is still executing, the scheduler skips the fire and retries at the next wake-up.`; + export function createSchedulerToolServer(scheduler: Scheduler): McpSdkServerConfigWithInstance { const scheduleTool = tool( "phantom_schedule", - `Create, list, delete, or trigger scheduled tasks. This lets you set up recurring jobs, one-shot reminders, and automated reports. - -ACTIONS: -- create: Create a new scheduled task. Returns the job ID and next run time. -- list: List all scheduled tasks with their status and next run time. -- delete: Remove a scheduled task by job ID or name. -- run: Trigger a task immediately for testing. Returns the task output. - -SCHEDULE TYPES: -- "at": One-shot at a specific time. { kind: "at", at: "2026-03-26T09:00:00-07:00" } -- "every": Recurring interval in ms. { kind: "every", intervalMs: 1800000 } (30 minutes) -- "cron": Cron expression with timezone. { kind: "cron", expr: "0 9 * * 1-5", tz: "America/Los_Angeles" } - -DELIVERY: -- { channel: "slack", target: "owner" } - DM the configured owner (default) -- { channel: "slack", target: "U04ABC123" } - DM a specific Slack user -- { channel: "slack", target: "C04ABC123" } - Post to a Slack channel -- { channel: "none" } - Silent (no delivery, useful for maintenance tasks) - -When creating a task, write the task prompt as a complete, self-contained instruction. -Include all necessary context in the task text. The scheduled run will NOT have access -to the current conversation.`, + TOOL_DESCRIPTION, { - action: z.enum(["create", "list", "delete", "run"]), + action: z + .enum(["create", "list", "delete", "run"]) + .describe( + "create: new scheduled task. list: enumerate tasks. delete: remove by jobId or name. run: trigger immediately (only when status=active and scheduler is idle).", + ), name: z.string().optional().describe("Job name (required for create)"), description: z.string().optional().describe("Job description"), schedule: ScheduleInputSchema.optional().describe("Schedule definition (required for create)"), - task: z.string().optional().describe("The prompt for the agent when the job fires (required for create)"), + task: z + .string() + .optional() + .describe("The prompt for the agent when the job fires (required for create, 32 KB max)"), delivery: JobDeliverySchema.optional().describe("Where to deliver results"), jobId: z.string().optional().describe("Job ID (for delete or run)"), }, @@ -89,14 +116,16 @@ to the current conversation.`, nextRunAt: j.nextRunAt, lastRunAt: j.lastRunAt, lastRunStatus: j.lastRunStatus, + lastDeliveryStatus: j.lastDeliveryStatus, runCount: j.runCount, + consecutiveErrors: j.consecutiveErrors, delivery: j.delivery, })), }); } case "delete": { - const targetId = input.jobId ?? findJobIdByName(scheduler, input.name); + const targetId = input.jobId ?? scheduler.findJobIdByName(input.name); if (!targetId) return err("Provide jobId or name to delete"); const deleted = scheduler.deleteJob(targetId); @@ -104,7 +133,7 @@ to the current conversation.`, } case "run": { - const targetId = input.jobId ?? findJobIdByName(scheduler, input.name); + const targetId = input.jobId ?? scheduler.findJobIdByName(input.name); if (!targetId) return err("Provide jobId or name to run"); const result = await scheduler.runJobNow(targetId); @@ -126,11 +155,3 @@ to the current conversation.`, tools: [scheduleTool], }); } - -function findJobIdByName(scheduler: Scheduler, name: string | undefined): string | undefined { - if (!name) return undefined; - const jobs = scheduler.listJobs(); - const lowerName = name.toLowerCase(); - const match = jobs.find((j) => j.name.toLowerCase() === lowerName); - return match?.id; -} diff --git a/src/scheduler/types.ts b/src/scheduler/types.ts index 4ae79c9..a7c7f68 100644 --- a/src/scheduler/types.ts +++ b/src/scheduler/types.ts @@ -5,7 +5,7 @@ export type ScheduleKind = z.infer; export const AtScheduleSchema = z.object({ kind: z.literal("at"), - at: z.string().describe("ISO 8601 timestamp"), + at: z.string().describe("ISO 8601 timestamp with explicit offset, e.g. 2026-03-26T09:00:00-07:00"), }); export const EveryScheduleSchema = z.object({ @@ -15,21 +15,28 @@ export const EveryScheduleSchema = z.object({ export const CronScheduleSchema = z.object({ kind: z.literal("cron"), - expr: z.string().describe("Cron expression (5 fields)"), + expr: z + .string() + .describe("Standard 5-field cron: minute hour day-of-month month day-of-week. No seconds, no nicknames."), tz: z.string().optional().describe("IANA timezone, e.g. America/Los_Angeles"), }); export const ScheduleSchema = z.discriminatedUnion("kind", [AtScheduleSchema, EveryScheduleSchema, CronScheduleSchema]); export type Schedule = z.infer; +// The JobDeliverySchema is the single canonical source of delivery defaults. +// service.createJob trusts the parsed shape and does not add a second fallback layer. +// See N9 in the Phase 2.5 scheduler audit for the rationale. export const JobDeliverySchema = z.object({ channel: z.enum(["slack", "none"]).default("slack"), - target: z.string().default("owner").describe('"owner" or a specific Slack user/channel ID'), + target: z.string().default("owner").describe('"owner", a Slack channel id (C...), or a Slack user id (U...)'), }); export type JobDelivery = z.infer; export type JobStatus = "active" | "paused" | "completed" | "failed"; +export const JOB_STATUS_VALUES: readonly JobStatus[] = ["active", "paused", "completed", "failed"] as const; export type RunStatus = "ok" | "error" | "skipped"; +export type DeliveryStatus = "delivered" | `dropped:${string}` | `error:${string}`; export type ScheduledJob = { id: string; @@ -44,6 +51,7 @@ export type ScheduledJob = { lastRunStatus: RunStatus | null; lastRunDurationMs: number | null; lastRunError: string | null; + lastDeliveryStatus: string | null; nextRunAt: string | null; runCount: number; consecutiveErrors: number; @@ -78,6 +86,7 @@ export type JobRow = { last_run_status: string | null; last_run_duration_ms: number | null; last_run_error: string | null; + last_delivery_status: string | null; next_run_at: string | null; run_count: number; consecutive_errors: number; @@ -86,3 +95,11 @@ export type JobRow = { created_by: string; updated_at: string; }; + +// Accepted Slack delivery targets. "owner" is a symbolic value that resolves +// at delivery time to the configured Slack owner user id. Channel ids begin +// with "C", user ids with "U". Anything else is rejected at creation time. +const SLACK_TARGET_RE = /^(?:owner|C[A-Z0-9]+|U[A-Z0-9]+)$/; +export function isValidSlackTarget(target: string): boolean { + return SLACK_TARGET_RE.test(target); +}