Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 53 additions & 1 deletion src/scheduler/__tests__/service.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -410,7 +410,7 @@ describe("Scheduler", () => {
expect(resumed).toBeNull();
});

test("resumeJob is a no-op on a non-paused job (active, failed, completed)", () => {
test("resumeJob without force is a no-op on a non-paused job (active, failed, completed)", () => {
const scheduler = new Scheduler({ db, runtime: mockRuntime as never });
const job = scheduler.createJob({
name: "ActiveJob",
Expand All @@ -431,6 +431,58 @@ describe("Scheduler", () => {
expect(stillCompleted?.status).toBe("completed");
});

test("resumeJob({force:true}) revives a failed job and resets the error counter", () => {
const scheduler = new Scheduler({ db, runtime: mockRuntime as never });
const job = scheduler.createJob({
name: "CircuitBroken",
schedule: { kind: "every", intervalMs: 60_000 },
task: "Retry me",
});
// Mirror the executor circuit-breaker terminal state: status=failed,
// next_run_at cleared, consecutive_errors at the cap.
db.run("UPDATE scheduled_jobs SET status = 'failed', next_run_at = NULL, consecutive_errors = 10 WHERE id = ?", [
job.id,
]);
const failed = scheduler.getJob(job.id);
expect(failed?.status).toBe("failed");
expect(failed?.consecutiveErrors).toBe(10);
expect(failed?.nextRunAt).toBeNull();

const revived = scheduler.resumeJob(job.id, { force: true });
expect(revived?.status).toBe("active");
expect(revived?.consecutiveErrors).toBe(0);
expect(revived?.nextRunAt).toBeTruthy();
const nextMs = revived?.nextRunAt ? new Date(revived.nextRunAt).getTime() : 0;
expect(nextMs).toBeGreaterThan(Date.now() - 5_000);
expect(nextMs).toBeLessThan(Date.now() + 120_000);
});

test("resumeJob({force:true}) still refuses to revive a completed job", () => {
const scheduler = new Scheduler({ db, runtime: mockRuntime as never });
const job = scheduler.createJob({
name: "DoneOnce",
schedule: { kind: "every", intervalMs: 60_000 },
task: "Done",
});
db.run("UPDATE scheduled_jobs SET status = 'completed' WHERE id = ?", [job.id]);
const result = scheduler.resumeJob(job.id, { force: true });
expect(result?.status).toBe("completed");
});

test("resumeJob({force:true}) on a paused job behaves like the unforced path", () => {
const scheduler = new Scheduler({ db, runtime: mockRuntime as never });
const job = scheduler.createJob({
name: "PausedPlusForce",
schedule: { kind: "every", intervalMs: 60_000 },
task: "Go",
});
scheduler.pauseJob(job.id);
db.run("UPDATE scheduled_jobs SET consecutive_errors = 3 WHERE id = ?", [job.id]);
const resumed = scheduler.resumeJob(job.id, { force: true });
expect(resumed?.status).toBe("active");
expect(resumed?.consecutiveErrors).toBe(0);
});

test("createJob honors enabled=false by inserting an inactive row", () => {
const scheduler = new Scheduler({ db, runtime: mockRuntime as never });
const job = scheduler.createJob({
Expand Down
34 changes: 22 additions & 12 deletions src/scheduler/service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -151,20 +151,30 @@ export class Scheduler {
}

/**
* Flip a paused job back to active. Recomputes next_run_at from the stored
* schedule so a job paused mid-interval resumes on a fresh cadence.
* Resets consecutive_errors so a job paused in its backoff fan-out gets a
* clean retry budget. Returns the updated job, or null if the id does not
* exist.
* Flip a paused or failed job back to active. Recomputes next_run_at from
* the stored schedule so a job paused mid-interval resumes on a fresh
* cadence. Resets consecutive_errors so a job paused in its backoff fan-out
* gets a clean retry budget.
*
* Transition rules:
* paused → active: always allowed.
* failed → active: requires opts.force === true. Failures the executor
* marks terminal are usually transient (model-provider rate limits, a
* brief Slack outage), but the executor cannot tell transient from
* broken. force opts in the operator: they have judged the underlying
* cause cleared. Without force this stays a no-op so an accidental
* resume call cannot bypass the circuit-breaker.
* completed → active: never. A one-shot may have already deleted itself
* inline (executor.ts, delete_after_run path); re-activating after that
* point is a sharp edge.
*
* Returns the updated job, or null if the id does not exist.
*/
resumeJob(id: string): ScheduledJob | null {
resumeJob(id: string, opts?: { force?: boolean }): ScheduledJob | null {
const job = this.getJob(id);
if (!job) return null;
// Only paused jobs may be resumed. Failed and completed are terminal
// states; force-reviving them would bypass the lifecycle (e.g.,
// re-running a one-shot that already deleted itself, or restarting a
// circuit-broken job without addressing the failure).
if (job.status !== "paused") return job;
const allowed = job.status === "paused" || (job.status === "failed" && opts?.force === true);
if (!allowed) return job;
const nextRun = computeNextRunAt(job.schedule);
const nextRunIso = nextRun ? nextRun.toISOString() : null;
this.db.run(
Expand All @@ -173,7 +183,7 @@ export class Scheduler {
next_run_at = ?,
consecutive_errors = 0,
updated_at = datetime('now')
WHERE id = ? AND status = 'paused'`,
WHERE id = ? AND status IN ('paused', 'failed')`,
[nextRunIso, id],
);
this.armTimer();
Expand Down
57 changes: 57 additions & 0 deletions src/ui/api/__tests__/scheduler.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -337,6 +337,63 @@ describe("scheduler API", () => {
expect(res.status).toBe(400);
});

test("POST /:id/resume on a failed job without force returns 409", async () => {
const job = scheduler.createJob({
name: "broken-circuit",
schedule: { kind: "every", intervalMs: 60_000 },
task: "fire",
});
db.run("UPDATE scheduled_jobs SET status = 'failed' WHERE id = ?", [job.id]);
const res = await handleUiRequest(req(`/ui/api/scheduler/${job.id}/resume`, { method: "POST" }));
expect(res.status).toBe(409);
const body = (await res.json()) as { error: string };
expect(body.error).toMatch(/force/i);
const stillFailed = scheduler.getJob(job.id);
expect(stillFailed?.status).toBe("failed");
});

test("POST /:id/resume on a failed job with {force:true} revives it", async () => {
const job = scheduler.createJob({
name: "rate-limited",
schedule: { kind: "every", intervalMs: 60_000 },
task: "fire",
});
db.run("UPDATE scheduled_jobs SET status = 'failed', next_run_at = NULL, consecutive_errors = 10 WHERE id = ?", [
job.id,
]);
const res = await handleUiRequest(
req(`/ui/api/scheduler/${job.id}/resume`, {
method: "POST",
body: JSON.stringify({ force: true }),
}),
);
expect(res.status).toBe(200);
const body = (await res.json()) as { job: { status: string; consecutiveErrors: number } };
expect(body.job.status).toBe("active");
expect(body.job.consecutiveErrors).toBe(0);
const audit = db
.query(
"SELECT action, previous_status, new_status FROM scheduler_audit_log WHERE job_id = ? ORDER BY id DESC LIMIT 1",
)
.get(job.id) as { action: string; previous_status: string; new_status: string };
expect(audit.action).toBe("resume");
expect(audit.previous_status).toBe("failed");
expect(audit.new_status).toBe("active");
});

test("POST /:id/resume tolerates an empty body for the paused → active path", async () => {
const job = scheduler.createJob({
name: "paused-no-body",
schedule: { kind: "every", intervalMs: 60_000 },
task: "fire",
});
scheduler.pauseJob(job.id);
const res = await handleUiRequest(req(`/ui/api/scheduler/${job.id}/resume`, { method: "POST" }));
expect(res.status).toBe(200);
const body = (await res.json()) as { job: { status: string } };
expect(body.job.status).toBe("active");
});

test("GET /:id/audit returns entries in descending order", async () => {
const job = scheduler.createJob({
name: "audited",
Expand Down
31 changes: 28 additions & 3 deletions src/ui/api/scheduler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,10 @@ const PreviewSchema = z.object({
schedule: ScheduleInputSchema,
});

const ResumeSchema = z.object({
force: z.boolean().optional(),
});

function json(body: unknown, init?: ResponseInit): Response {
return new Response(JSON.stringify(body), {
...init,
Expand Down Expand Up @@ -268,10 +272,31 @@ function handlePause(deps: SchedulerApiDeps, id: string): Response {
return json({ job: updated });
}

function handleResume(deps: SchedulerApiDeps, id: string): Response {
async function handleResume(req: Request, deps: SchedulerApiDeps, id: string): Promise<Response> {
const before = deps.scheduler.getJob(id);
if (!before) return errJson("Job not found", 404);
const updated = deps.scheduler.resumeJob(id);

// Body is optional for the paused → active path (backward compat with
// existing clients that POST with no body). Parse only when one is given.
let force = false;
const raw = await req.text();
if (raw.trim().length > 0) {
let parsed: unknown;
try {
parsed = JSON.parse(raw);
} catch {
return errJson("Invalid JSON body", 400);
}
const result = ResumeSchema.safeParse(parsed);
if (!result.success) return errJson(zodErrorMessage(result.error), 400);
force = result.data.force === true;
}

if (before.status === "failed" && !force) {
return errJson("Job is in status 'failed'. Pass {\"force\": true} to revive after investigating the failure.", 409);
}

const updated = deps.scheduler.resumeJob(id, { force });
if (!updated) return errJson("Job not found", 404);
writeAudit(deps.db, {
jobId: updated.id,
Expand Down Expand Up @@ -357,7 +382,7 @@ export async function handleSchedulerApi(req: Request, url: URL, deps: Scheduler
const resumeMatch = pathname.match(/^\/ui\/api\/scheduler\/([^/]+)\/resume$/);
if (resumeMatch) {
if (req.method !== "POST") return errJson("Method not allowed", 405);
return handleResume(deps, resumeMatch[1]);
return handleResume(req, deps, resumeMatch[1]);
}

const runMatch = pathname.match(/^\/ui\/api\/scheduler\/([^/]+)\/run$/);
Expand Down
Loading