From c354b8db9d1bec675b50618d3f94863976173fee Mon Sep 17 00:00:00 2001 From: truffle Date: Wed, 20 May 2026 07:18:07 +0000 Subject: [PATCH] feat(scheduler): force-resume gate for failed jobs MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Rate-limit storms (and any sustained error) drive a scheduled job to status=failed with next_run_at=NULL once MAX_CONSECUTIVE_ERRORS = 10 trips. resumeJob refused to touch the row, so recovery meant a direct SQLite UPDATE on the live DB. Add an opt-in revival path: scheduler.resumeJob(id, { force: true }) POST /ui/api/scheduler/:id/resume { "force": true } The HTTP path returns 409 with a force-prompt message when the caller omits force on a failed job, so the circuit-breaker still defaults to no-op. paused → active stays unchanged; completed → active stays forbidden even with force (one-shots may have already self-deleted). Closes #128 --- src/scheduler/__tests__/service.test.ts | 54 ++++++++++++++++++++++- src/scheduler/service.ts | 34 +++++++++------ src/ui/api/__tests__/scheduler.test.ts | 57 +++++++++++++++++++++++++ src/ui/api/scheduler.ts | 31 ++++++++++++-- 4 files changed, 160 insertions(+), 16 deletions(-) diff --git a/src/scheduler/__tests__/service.test.ts b/src/scheduler/__tests__/service.test.ts index d73f06f5..4a8c3678 100644 --- a/src/scheduler/__tests__/service.test.ts +++ b/src/scheduler/__tests__/service.test.ts @@ -410,7 +410,7 @@ describe("Scheduler", () => { expect(resumed).toBeNull(); }); - test("resumeJob is a no-op on a non-paused job (active, failed, completed)", () => { + test("resumeJob without force is a no-op on a non-paused job (active, failed, completed)", () => { const scheduler = new Scheduler({ db, runtime: mockRuntime as never }); const job = scheduler.createJob({ name: "ActiveJob", @@ -431,6 +431,58 @@ describe("Scheduler", () => { expect(stillCompleted?.status).toBe("completed"); }); + test("resumeJob({force:true}) revives a failed job and resets the error counter", () => { + const scheduler = new Scheduler({ db, runtime: mockRuntime as never }); + const job = scheduler.createJob({ + name: "CircuitBroken", + schedule: { kind: "every", intervalMs: 60_000 }, + task: "Retry me", + }); + // Mirror the executor circuit-breaker terminal state: status=failed, + // next_run_at cleared, consecutive_errors at the cap. + db.run("UPDATE scheduled_jobs SET status = 'failed', next_run_at = NULL, consecutive_errors = 10 WHERE id = ?", [ + job.id, + ]); + const failed = scheduler.getJob(job.id); + expect(failed?.status).toBe("failed"); + expect(failed?.consecutiveErrors).toBe(10); + expect(failed?.nextRunAt).toBeNull(); + + const revived = scheduler.resumeJob(job.id, { force: true }); + expect(revived?.status).toBe("active"); + expect(revived?.consecutiveErrors).toBe(0); + expect(revived?.nextRunAt).toBeTruthy(); + const nextMs = revived?.nextRunAt ? new Date(revived.nextRunAt).getTime() : 0; + expect(nextMs).toBeGreaterThan(Date.now() - 5_000); + expect(nextMs).toBeLessThan(Date.now() + 120_000); + }); + + test("resumeJob({force:true}) still refuses to revive a completed job", () => { + const scheduler = new Scheduler({ db, runtime: mockRuntime as never }); + const job = scheduler.createJob({ + name: "DoneOnce", + schedule: { kind: "every", intervalMs: 60_000 }, + task: "Done", + }); + db.run("UPDATE scheduled_jobs SET status = 'completed' WHERE id = ?", [job.id]); + const result = scheduler.resumeJob(job.id, { force: true }); + expect(result?.status).toBe("completed"); + }); + + test("resumeJob({force:true}) on a paused job behaves like the unforced path", () => { + const scheduler = new Scheduler({ db, runtime: mockRuntime as never }); + const job = scheduler.createJob({ + name: "PausedPlusForce", + schedule: { kind: "every", intervalMs: 60_000 }, + task: "Go", + }); + scheduler.pauseJob(job.id); + db.run("UPDATE scheduled_jobs SET consecutive_errors = 3 WHERE id = ?", [job.id]); + const resumed = scheduler.resumeJob(job.id, { force: true }); + expect(resumed?.status).toBe("active"); + expect(resumed?.consecutiveErrors).toBe(0); + }); + test("createJob honors enabled=false by inserting an inactive row", () => { const scheduler = new Scheduler({ db, runtime: mockRuntime as never }); const job = scheduler.createJob({ diff --git a/src/scheduler/service.ts b/src/scheduler/service.ts index 807fd231..40b89307 100644 --- a/src/scheduler/service.ts +++ b/src/scheduler/service.ts @@ -151,20 +151,30 @@ export class Scheduler { } /** - * Flip a paused job back to active. Recomputes next_run_at from the stored - * schedule so a job paused mid-interval resumes on a fresh cadence. - * Resets consecutive_errors so a job paused in its backoff fan-out gets a - * clean retry budget. Returns the updated job, or null if the id does not - * exist. + * Flip a paused or failed job back to active. Recomputes next_run_at from + * the stored schedule so a job paused mid-interval resumes on a fresh + * cadence. Resets consecutive_errors so a job paused in its backoff fan-out + * gets a clean retry budget. + * + * Transition rules: + * paused → active: always allowed. + * failed → active: requires opts.force === true. Failures the executor + * marks terminal are usually transient (model-provider rate limits, a + * brief Slack outage), but the executor cannot tell transient from + * broken. force opts in the operator: they have judged the underlying + * cause cleared. Without force this stays a no-op so an accidental + * resume call cannot bypass the circuit-breaker. + * completed → active: never. A one-shot may have already deleted itself + * inline (executor.ts, delete_after_run path); re-activating after that + * point is a sharp edge. + * + * Returns the updated job, or null if the id does not exist. */ - resumeJob(id: string): ScheduledJob | null { + resumeJob(id: string, opts?: { force?: boolean }): ScheduledJob | null { const job = this.getJob(id); if (!job) return null; - // Only paused jobs may be resumed. Failed and completed are terminal - // states; force-reviving them would bypass the lifecycle (e.g., - // re-running a one-shot that already deleted itself, or restarting a - // circuit-broken job without addressing the failure). - if (job.status !== "paused") return job; + const allowed = job.status === "paused" || (job.status === "failed" && opts?.force === true); + if (!allowed) return job; const nextRun = computeNextRunAt(job.schedule); const nextRunIso = nextRun ? nextRun.toISOString() : null; this.db.run( @@ -173,7 +183,7 @@ export class Scheduler { next_run_at = ?, consecutive_errors = 0, updated_at = datetime('now') - WHERE id = ? AND status = 'paused'`, + WHERE id = ? AND status IN ('paused', 'failed')`, [nextRunIso, id], ); this.armTimer(); diff --git a/src/ui/api/__tests__/scheduler.test.ts b/src/ui/api/__tests__/scheduler.test.ts index b60127ee..64324c4e 100644 --- a/src/ui/api/__tests__/scheduler.test.ts +++ b/src/ui/api/__tests__/scheduler.test.ts @@ -337,6 +337,63 @@ describe("scheduler API", () => { expect(res.status).toBe(400); }); + test("POST /:id/resume on a failed job without force returns 409", async () => { + const job = scheduler.createJob({ + name: "broken-circuit", + schedule: { kind: "every", intervalMs: 60_000 }, + task: "fire", + }); + db.run("UPDATE scheduled_jobs SET status = 'failed' WHERE id = ?", [job.id]); + const res = await handleUiRequest(req(`/ui/api/scheduler/${job.id}/resume`, { method: "POST" })); + expect(res.status).toBe(409); + const body = (await res.json()) as { error: string }; + expect(body.error).toMatch(/force/i); + const stillFailed = scheduler.getJob(job.id); + expect(stillFailed?.status).toBe("failed"); + }); + + test("POST /:id/resume on a failed job with {force:true} revives it", async () => { + const job = scheduler.createJob({ + name: "rate-limited", + schedule: { kind: "every", intervalMs: 60_000 }, + task: "fire", + }); + db.run("UPDATE scheduled_jobs SET status = 'failed', next_run_at = NULL, consecutive_errors = 10 WHERE id = ?", [ + job.id, + ]); + const res = await handleUiRequest( + req(`/ui/api/scheduler/${job.id}/resume`, { + method: "POST", + body: JSON.stringify({ force: true }), + }), + ); + expect(res.status).toBe(200); + const body = (await res.json()) as { job: { status: string; consecutiveErrors: number } }; + expect(body.job.status).toBe("active"); + expect(body.job.consecutiveErrors).toBe(0); + const audit = db + .query( + "SELECT action, previous_status, new_status FROM scheduler_audit_log WHERE job_id = ? ORDER BY id DESC LIMIT 1", + ) + .get(job.id) as { action: string; previous_status: string; new_status: string }; + expect(audit.action).toBe("resume"); + expect(audit.previous_status).toBe("failed"); + expect(audit.new_status).toBe("active"); + }); + + test("POST /:id/resume tolerates an empty body for the paused → active path", async () => { + const job = scheduler.createJob({ + name: "paused-no-body", + schedule: { kind: "every", intervalMs: 60_000 }, + task: "fire", + }); + scheduler.pauseJob(job.id); + const res = await handleUiRequest(req(`/ui/api/scheduler/${job.id}/resume`, { method: "POST" })); + expect(res.status).toBe(200); + const body = (await res.json()) as { job: { status: string } }; + expect(body.job.status).toBe("active"); + }); + test("GET /:id/audit returns entries in descending order", async () => { const job = scheduler.createJob({ name: "audited", diff --git a/src/ui/api/scheduler.ts b/src/ui/api/scheduler.ts index 5cac61bd..e915f48c 100644 --- a/src/ui/api/scheduler.ts +++ b/src/ui/api/scheduler.ts @@ -56,6 +56,10 @@ const PreviewSchema = z.object({ schedule: ScheduleInputSchema, }); +const ResumeSchema = z.object({ + force: z.boolean().optional(), +}); + function json(body: unknown, init?: ResponseInit): Response { return new Response(JSON.stringify(body), { ...init, @@ -268,10 +272,31 @@ function handlePause(deps: SchedulerApiDeps, id: string): Response { return json({ job: updated }); } -function handleResume(deps: SchedulerApiDeps, id: string): Response { +async function handleResume(req: Request, deps: SchedulerApiDeps, id: string): Promise { const before = deps.scheduler.getJob(id); if (!before) return errJson("Job not found", 404); - const updated = deps.scheduler.resumeJob(id); + + // Body is optional for the paused → active path (backward compat with + // existing clients that POST with no body). Parse only when one is given. + let force = false; + const raw = await req.text(); + if (raw.trim().length > 0) { + let parsed: unknown; + try { + parsed = JSON.parse(raw); + } catch { + return errJson("Invalid JSON body", 400); + } + const result = ResumeSchema.safeParse(parsed); + if (!result.success) return errJson(zodErrorMessage(result.error), 400); + force = result.data.force === true; + } + + if (before.status === "failed" && !force) { + return errJson("Job is in status 'failed'. Pass {\"force\": true} to revive after investigating the failure.", 409); + } + + const updated = deps.scheduler.resumeJob(id, { force }); if (!updated) return errJson("Job not found", 404); writeAudit(deps.db, { jobId: updated.id, @@ -357,7 +382,7 @@ export async function handleSchedulerApi(req: Request, url: URL, deps: Scheduler const resumeMatch = pathname.match(/^\/ui\/api\/scheduler\/([^/]+)\/resume$/); if (resumeMatch) { if (req.method !== "POST") return errJson("Method not allowed", 405); - return handleResume(deps, resumeMatch[1]); + return handleResume(req, deps, resumeMatch[1]); } const runMatch = pathname.match(/^\/ui\/api\/scheduler\/([^/]+)\/run$/);