Skip to content

Commit 8d846c5

Browse files
waleedlatif1claude
andauthored
feat(async-jobs): async execution with job queue backends (#3134)
* feat(async-jobs): async execution with job queue backends * added migration * remove unused envvar, remove extraneous comments * ack comment * same for db * added dedicated async envvars for timeouts, updated helm * updated comment * ack comment * migrated routes to be more restful * ack comments --------- Co-authored-by: Claude Opus 4.5 <noreply@anthropic.com>
1 parent 362f4c2 commit 8d846c5

File tree

37 files changed

+12093
-1002
lines changed

37 files changed

+12093
-1002
lines changed

apps/sim/app/api/creators/[id]/route.ts

Lines changed: 25 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ const UpdateCreatorProfileSchema = z.object({
2121
name: z.string().min(1, 'Name is required').max(100, 'Max 100 characters').optional(),
2222
profileImageUrl: z.string().optional().or(z.literal('')),
2323
details: CreatorProfileDetailsSchema.optional(),
24+
verified: z.boolean().optional(), // Verification status (super users only)
2425
})
2526

2627
// Helper to check if user has permission to manage profile
@@ -97,11 +98,29 @@ export async function PUT(request: NextRequest, { params }: { params: Promise<{
9798
return NextResponse.json({ error: 'Profile not found' }, { status: 404 })
9899
}
99100

100-
// Check permissions
101-
const canEdit = await hasPermission(session.user.id, existing[0])
102-
if (!canEdit) {
103-
logger.warn(`[${requestId}] User denied permission to update profile: ${id}`)
104-
return NextResponse.json({ error: 'Access denied' }, { status: 403 })
101+
// Verification changes require super user permission
102+
if (data.verified !== undefined) {
103+
const { verifyEffectiveSuperUser } = await import('@/lib/templates/permissions')
104+
const { effectiveSuperUser } = await verifyEffectiveSuperUser(session.user.id)
105+
if (!effectiveSuperUser) {
106+
logger.warn(`[${requestId}] Non-super user attempted to change creator verification: ${id}`)
107+
return NextResponse.json(
108+
{ error: 'Only super users can change verification status' },
109+
{ status: 403 }
110+
)
111+
}
112+
}
113+
114+
// For non-verified updates, check regular permissions
115+
const hasNonVerifiedUpdates =
116+
data.name !== undefined || data.profileImageUrl !== undefined || data.details !== undefined
117+
118+
if (hasNonVerifiedUpdates) {
119+
const canEdit = await hasPermission(session.user.id, existing[0])
120+
if (!canEdit) {
121+
logger.warn(`[${requestId}] User denied permission to update profile: ${id}`)
122+
return NextResponse.json({ error: 'Access denied' }, { status: 403 })
123+
}
105124
}
106125

107126
const updateData: any = {
@@ -111,6 +130,7 @@ export async function PUT(request: NextRequest, { params }: { params: Promise<{
111130
if (data.name !== undefined) updateData.name = data.name
112131
if (data.profileImageUrl !== undefined) updateData.profileImageUrl = data.profileImageUrl
113132
if (data.details !== undefined) updateData.details = data.details
133+
if (data.verified !== undefined) updateData.verified = data.verified
114134

115135
const updated = await db
116136
.update(templateCreators)

apps/sim/app/api/creators/[id]/verify/route.ts

Lines changed: 0 additions & 113 deletions
This file was deleted.

apps/sim/app/api/cron/cleanup-stale-executions/route.ts

Lines changed: 97 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,10 @@
1-
import { db } from '@sim/db'
1+
import { asyncJobs, db } from '@sim/db'
22
import { workflowExecutionLogs } from '@sim/db/schema'
33
import { createLogger } from '@sim/logger'
4-
import { and, eq, lt, sql } from 'drizzle-orm'
4+
import { and, eq, inArray, lt, sql } from 'drizzle-orm'
55
import { type NextRequest, NextResponse } from 'next/server'
66
import { verifyCronAuth } from '@/lib/auth/internal'
7+
import { JOB_RETENTION_HOURS, JOB_STATUS } from '@/lib/core/async-jobs'
78
import { getMaxExecutionTimeout } from '@/lib/core/execution-limits'
89

910
const logger = createLogger('CleanupStaleExecutions')
@@ -80,12 +81,102 @@ export async function GET(request: NextRequest) {
8081

8182
logger.info(`Stale execution cleanup completed. Cleaned: ${cleaned}, Failed: ${failed}`)
8283

84+
// Clean up stale async jobs (stuck in processing)
85+
let asyncJobsMarkedFailed = 0
86+
87+
try {
88+
const staleAsyncJobs = await db
89+
.update(asyncJobs)
90+
.set({
91+
status: JOB_STATUS.FAILED,
92+
completedAt: new Date(),
93+
error: `Job terminated: stuck in processing for more than ${STALE_THRESHOLD_MINUTES} minutes`,
94+
updatedAt: new Date(),
95+
})
96+
.where(
97+
and(eq(asyncJobs.status, JOB_STATUS.PROCESSING), lt(asyncJobs.startedAt, staleThreshold))
98+
)
99+
.returning({ id: asyncJobs.id })
100+
101+
asyncJobsMarkedFailed = staleAsyncJobs.length
102+
if (asyncJobsMarkedFailed > 0) {
103+
logger.info(`Marked ${asyncJobsMarkedFailed} stale async jobs as failed`)
104+
}
105+
} catch (error) {
106+
logger.error('Failed to clean up stale async jobs:', {
107+
error: error instanceof Error ? error.message : String(error),
108+
})
109+
}
110+
111+
// Clean up stale pending jobs (never started, e.g., due to server crash before startJob())
112+
let stalePendingJobsMarkedFailed = 0
113+
114+
try {
115+
const stalePendingJobs = await db
116+
.update(asyncJobs)
117+
.set({
118+
status: JOB_STATUS.FAILED,
119+
completedAt: new Date(),
120+
error: `Job terminated: stuck in pending state for more than ${STALE_THRESHOLD_MINUTES} minutes (never started)`,
121+
updatedAt: new Date(),
122+
})
123+
.where(
124+
and(eq(asyncJobs.status, JOB_STATUS.PENDING), lt(asyncJobs.createdAt, staleThreshold))
125+
)
126+
.returning({ id: asyncJobs.id })
127+
128+
stalePendingJobsMarkedFailed = stalePendingJobs.length
129+
if (stalePendingJobsMarkedFailed > 0) {
130+
logger.info(`Marked ${stalePendingJobsMarkedFailed} stale pending jobs as failed`)
131+
}
132+
} catch (error) {
133+
logger.error('Failed to clean up stale pending jobs:', {
134+
error: error instanceof Error ? error.message : String(error),
135+
})
136+
}
137+
138+
// Delete completed/failed jobs older than retention period
139+
const retentionThreshold = new Date(Date.now() - JOB_RETENTION_HOURS * 60 * 60 * 1000)
140+
let asyncJobsDeleted = 0
141+
142+
try {
143+
const deletedJobs = await db
144+
.delete(asyncJobs)
145+
.where(
146+
and(
147+
inArray(asyncJobs.status, [JOB_STATUS.COMPLETED, JOB_STATUS.FAILED]),
148+
lt(asyncJobs.completedAt, retentionThreshold)
149+
)
150+
)
151+
.returning({ id: asyncJobs.id })
152+
153+
asyncJobsDeleted = deletedJobs.length
154+
if (asyncJobsDeleted > 0) {
155+
logger.info(
156+
`Deleted ${asyncJobsDeleted} old async jobs (retention: ${JOB_RETENTION_HOURS}h)`
157+
)
158+
}
159+
} catch (error) {
160+
logger.error('Failed to delete old async jobs:', {
161+
error: error instanceof Error ? error.message : String(error),
162+
})
163+
}
164+
83165
return NextResponse.json({
84166
success: true,
85-
found: staleExecutions.length,
86-
cleaned,
87-
failed,
88-
thresholdMinutes: STALE_THRESHOLD_MINUTES,
167+
executions: {
168+
found: staleExecutions.length,
169+
cleaned,
170+
failed,
171+
thresholdMinutes: STALE_THRESHOLD_MINUTES,
172+
},
173+
asyncJobs: {
174+
staleProcessingMarkedFailed: asyncJobsMarkedFailed,
175+
stalePendingMarkedFailed: stalePendingJobsMarkedFailed,
176+
oldDeleted: asyncJobsDeleted,
177+
staleThresholdMinutes: STALE_THRESHOLD_MINUTES,
178+
retentionHours: JOB_RETENTION_HOURS,
179+
},
89180
})
90181
} catch (error) {
91182
logger.error('Error in stale execution cleanup job:', error)

0 commit comments

Comments
 (0)