Skip to content

Commit 511e3a9

Browse files
waleedlatif1claude
andcommitted
fix(schedules): release lastQueuedAt lock on all exit paths to prevent stuck schedules
Multiple error/early-return paths in executeScheduleJob and executeJobInline were exiting without clearing lastQueuedAt, causing the dueFilter to permanently skip those schedules — resulting in stale "X hours ago" display for nextRunAt. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
1 parent 6fd8712 commit 511e3a9

File tree

2 files changed

+54
-14
lines changed

2 files changed

+54
-14
lines changed

apps/sim/app/api/schedules/execute/route.ts

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,11 @@ import { type NextRequest, NextResponse } from 'next/server'
55
import { verifyCronAuth } from '@/lib/auth/internal'
66
import { getJobQueue, shouldExecuteInline } from '@/lib/core/async-jobs'
77
import { generateRequestId } from '@/lib/core/utils/request'
8-
import { executeJobInline, executeScheduleJob } from '@/background/schedule-execution'
8+
import {
9+
executeJobInline,
10+
executeScheduleJob,
11+
releaseScheduleLock,
12+
} from '@/background/schedule-execution'
913

1014
export const dynamic = 'force-dynamic'
1115

@@ -150,6 +154,12 @@ export async function GET(request: NextRequest) {
150154
logger.error(`[${requestId}] Job execution failed for ${job.id}`, {
151155
error: error instanceof Error ? error.message : String(error),
152156
})
157+
await releaseScheduleLock(
158+
job.id,
159+
requestId,
160+
queuedAt,
161+
`Failed to release lock for job ${job.id}`
162+
)
153163
}
154164
})()
155165
})

apps/sim/background/schedule-execution.ts

Lines changed: 43 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ async function applyScheduleUpdate(
5151
}
5252
}
5353

54-
async function releaseScheduleLock(
54+
export async function releaseScheduleLock(
5555
scheduleId: string,
5656
requestId: string,
5757
now: Date,
@@ -400,6 +400,7 @@ export async function executeScheduleJob(payload: ScheduleExecutionPayload) {
400400
{
401401
updatedAt: now,
402402
nextRunAt: nextRetryAt,
403+
lastQueuedAt: null,
403404
},
404405
requestId,
405406
`Error updating schedule ${payload.scheduleId} for rate limit`
@@ -409,18 +410,19 @@ export async function executeScheduleJob(payload: ScheduleExecutionPayload) {
409410

410411
case 402: {
411412
logger.warn(`[${requestId}] Usage limit exceeded, scheduling next run`)
412-
const nextRunAt = await calculateNextRunFromDeployment(payload, requestId)
413-
if (nextRunAt) {
414-
await applyScheduleUpdate(
415-
payload.scheduleId,
416-
{
417-
updatedAt: now,
418-
nextRunAt,
419-
},
420-
requestId,
421-
`Error updating schedule ${payload.scheduleId} after usage limit check`
422-
)
423-
}
413+
const nextRunAt =
414+
(await calculateNextRunFromDeployment(payload, requestId)) ??
415+
new Date(now.getTime() + 60 * 60 * 1000)
416+
await applyScheduleUpdate(
417+
payload.scheduleId,
418+
{
419+
updatedAt: now,
420+
lastQueuedAt: null,
421+
nextRunAt,
422+
},
423+
requestId,
424+
`Error updating schedule ${payload.scheduleId} after usage limit check`
425+
)
424426
return
425427
}
426428

@@ -440,6 +442,7 @@ export async function executeScheduleJob(payload: ScheduleExecutionPayload) {
440442
payload.scheduleId,
441443
{
442444
updatedAt: now,
445+
lastQueuedAt: null,
443446
nextRunAt,
444447
failedCount: newFailedCount,
445448
lastFailedAt: now,
@@ -456,6 +459,12 @@ export async function executeScheduleJob(payload: ScheduleExecutionPayload) {
456459
const { actorUserId, workflowRecord } = preprocessResult
457460
if (!actorUserId || !workflowRecord) {
458461
logger.error(`[${requestId}] Missing required preprocessing data`)
462+
await releaseScheduleLock(
463+
payload.scheduleId,
464+
requestId,
465+
now,
466+
`Failed to release schedule ${payload.scheduleId} after missing preprocessing data`
467+
)
459468
return
460469
}
461470

@@ -519,6 +528,7 @@ export async function executeScheduleJob(payload: ScheduleExecutionPayload) {
519528
payload.scheduleId,
520529
{
521530
updatedAt: now,
531+
lastQueuedAt: null,
522532
nextRunAt,
523533
failedCount: newFailedCount,
524534
lastFailedAt: now,
@@ -540,6 +550,7 @@ export async function executeScheduleJob(payload: ScheduleExecutionPayload) {
540550
payload.scheduleId,
541551
{
542552
updatedAt: now,
553+
lastQueuedAt: null,
543554
nextRunAt: nextRetryAt,
544555
},
545556
requestId,
@@ -564,6 +575,7 @@ export async function executeScheduleJob(payload: ScheduleExecutionPayload) {
564575
payload.scheduleId,
565576
{
566577
updatedAt: now,
578+
lastQueuedAt: null,
567579
nextRunAt,
568580
failedCount: newFailedCount,
569581
lastFailedAt: now,
@@ -575,6 +587,12 @@ export async function executeScheduleJob(payload: ScheduleExecutionPayload) {
575587
}
576588
} catch (error: unknown) {
577589
logger.error(`[${requestId}] Error processing schedule ${payload.scheduleId}`, error)
590+
await releaseScheduleLock(
591+
payload.scheduleId,
592+
requestId,
593+
now,
594+
`Failed to release schedule ${payload.scheduleId} after unhandled error`
595+
)
578596
}
579597
}
580598

@@ -777,13 +795,25 @@ export async function executeJobInline(payload: JobExecutionPayload) {
777795
logger.error(`[${requestId}] Job record missing required fields`, {
778796
scheduleId: payload.scheduleId,
779797
})
798+
await releaseScheduleLock(
799+
payload.scheduleId,
800+
requestId,
801+
now,
802+
`Failed to release job ${payload.scheduleId} after missing fields`
803+
)
780804
return
781805
}
782806

783807
if (jobRecord.status === 'completed') {
784808
logger.info(`[${requestId}] Job already completed, skipping`, {
785809
scheduleId: payload.scheduleId,
786810
})
811+
await releaseScheduleLock(
812+
payload.scheduleId,
813+
requestId,
814+
now,
815+
`Failed to release job ${payload.scheduleId} after completed skip`
816+
)
787817
return
788818
}
789819

0 commit comments

Comments
 (0)