11import { db , workflowDeploymentVersion , workflowSchedule } from '@sim/db'
22import { createLogger } from '@sim/logger'
3- import { and , eq , isNull , lt , lte , not , or , sql } from 'drizzle-orm'
3+ import { and , eq , isNull , lt , lte , ne , not , or , sql } from 'drizzle-orm'
44import { type NextRequest , NextResponse } from 'next/server'
55import { verifyCronAuth } from '@/lib/auth/internal'
66import { getJobQueue , shouldExecuteInline } from '@/lib/core/async-jobs'
77import { generateRequestId } from '@/lib/core/utils/request'
8- import { executeScheduleJob } from '@/background/schedule-execution'
8+ import { executeJobInline , executeScheduleJob } from '@/background/schedule-execution'
99
1010export const dynamic = 'force-dynamic'
1111
1212const logger = createLogger ( 'ScheduledExecuteAPI' )
1313
14+ const dueFilter = ( queuedAt : Date ) =>
15+ and (
16+ lte ( workflowSchedule . nextRunAt , queuedAt ) ,
17+ not ( eq ( workflowSchedule . status , 'disabled' ) ) ,
18+ ne ( workflowSchedule . status , 'completed' ) ,
19+ or (
20+ isNull ( workflowSchedule . lastQueuedAt ) ,
21+ lt ( workflowSchedule . lastQueuedAt , workflowSchedule . nextRunAt )
22+ )
23+ )
24+
1425export async function GET ( request : NextRequest ) {
1526 const requestId = generateRequestId ( )
1627 logger . info ( `[${ requestId } ] Scheduled execution triggered at ${ new Date ( ) . toISOString ( ) } ` )
@@ -23,20 +34,14 @@ export async function GET(request: NextRequest) {
2334 const queuedAt = new Date ( )
2435
2536 try {
37+ // Workflow schedules (require active deployment)
2638 const dueSchedules = await db
2739 . update ( workflowSchedule )
28- . set ( {
29- lastQueuedAt : queuedAt ,
30- updatedAt : queuedAt ,
31- } )
40+ . set ( { lastQueuedAt : queuedAt , updatedAt : queuedAt } )
3241 . where (
3342 and (
34- lte ( workflowSchedule . nextRunAt , queuedAt ) ,
35- not ( eq ( workflowSchedule . status , 'disabled' ) ) ,
36- or (
37- isNull ( workflowSchedule . lastQueuedAt ) ,
38- lt ( workflowSchedule . lastQueuedAt , workflowSchedule . nextRunAt )
39- ) ,
43+ dueFilter ( queuedAt ) ,
44+ or ( eq ( workflowSchedule . sourceType , 'workflow' ) , isNull ( workflowSchedule . sourceType ) ) ,
4045 sql `${ workflowSchedule . deploymentVersionId } = (select ${ workflowDeploymentVersion . id } from ${ workflowDeploymentVersion } where ${ workflowDeploymentVersion . workflowId } = ${ workflowSchedule . workflowId } and ${ workflowDeploymentVersion . isActive } = true)`
4146 )
4247 )
@@ -49,18 +54,33 @@ export async function GET(request: NextRequest) {
4954 failedCount : workflowSchedule . failedCount ,
5055 nextRunAt : workflowSchedule . nextRunAt ,
5156 lastQueuedAt : workflowSchedule . lastQueuedAt ,
57+ sourceType : workflowSchedule . sourceType ,
5258 } )
5359
54- logger . info ( `[${ requestId } ] Processing ${ dueSchedules . length } due scheduled workflows` )
60+ // Jobs (no deployment, dispatch inline)
61+ const dueJobs = await db
62+ . update ( workflowSchedule )
63+ . set ( { lastQueuedAt : queuedAt , updatedAt : queuedAt } )
64+ . where ( and ( dueFilter ( queuedAt ) , eq ( workflowSchedule . sourceType , 'job' ) ) )
65+ . returning ( {
66+ id : workflowSchedule . id ,
67+ cronExpression : workflowSchedule . cronExpression ,
68+ failedCount : workflowSchedule . failedCount ,
69+ lastQueuedAt : workflowSchedule . lastQueuedAt ,
70+ sourceType : workflowSchedule . sourceType ,
71+ } )
72+
73+ const totalCount = dueSchedules . length + dueJobs . length
74+ logger . info ( `[${ requestId } ] Processing ${ totalCount } due items (${ dueSchedules . length } schedules, ${ dueJobs . length } jobs)` )
5575
5676 const jobQueue = await getJobQueue ( )
5777
58- const queuePromises = dueSchedules . map ( async ( schedule ) => {
78+ const schedulePromises = dueSchedules . map ( async ( schedule ) => {
5979 const queueTime = schedule . lastQueuedAt ?? queuedAt
6080
6181 const payload = {
6282 scheduleId : schedule . id ,
63- workflowId : schedule . workflowId ,
83+ workflowId : schedule . workflowId ! ,
6484 blockId : schedule . blockId || undefined ,
6585 cronExpression : schedule . cronExpression || undefined ,
6686 lastRanAt : schedule . lastRanAt ?. toISOString ( ) ,
@@ -111,13 +131,34 @@ export async function GET(request: NextRequest) {
111131 }
112132 } )
113133
114- await Promise . allSettled ( queuePromises )
134+ // Jobs always execute inline (no TriggerDev)
135+ const jobPromises = dueJobs . map ( async ( job ) => {
136+ const queueTime = job . lastQueuedAt ?? queuedAt
137+ const payload = {
138+ scheduleId : job . id ,
139+ cronExpression : job . cronExpression || undefined ,
140+ failedCount : job . failedCount || 0 ,
141+ now : queueTime . toISOString ( ) ,
142+ }
143+
144+ void ( async ( ) => {
145+ try {
146+ await executeJobInline ( payload )
147+ } catch ( error ) {
148+ logger . error ( `[${ requestId } ] Job execution failed for ${ job . id } ` , {
149+ error : error instanceof Error ? error . message : String ( error ) ,
150+ } )
151+ }
152+ } ) ( )
153+ } )
154+
155+ await Promise . allSettled ( [ ...schedulePromises , ...jobPromises ] )
115156
116- logger . info ( `[${ requestId } ] Queued ${ dueSchedules . length } schedule executions ` )
157+ logger . info ( `[${ requestId } ] Processed ${ totalCount } items ` )
117158
118159 return NextResponse . json ( {
119160 message : 'Scheduled workflow executions processed' ,
120- executedCount : dueSchedules . length ,
161+ executedCount : totalCount ,
121162 } )
122163 } catch ( error : any ) {
123164 logger . error ( `[${ requestId } ] Error in scheduled execution handler` , error )
0 commit comments