Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
79eaf18
improvement(processing): reduce redundant DB queries in execution pre…
waleedlatif1 Feb 24, 2026
9b201d8
improvement(processing): add defensive ID check for prefetched workfl…
waleedlatif1 Feb 24, 2026
eba3358
improvement(processing): fix type safety in execution error logging
waleedlatif1 Feb 24, 2026
ffd5385
improvement(processing): replace `as any` casts with proper types in …
waleedlatif1 Feb 24, 2026
007d137
improvement(processing): use exported HighestPrioritySubscription typ…
waleedlatif1 Feb 24, 2026
6540453
improvement(processing): replace remaining `as any` casts with proper…
waleedlatif1 Feb 24, 2026
1b3708a
fix(processing): prevent double-billing race in LoggingSession comple…
waleedlatif1 Feb 24, 2026
a1b91c1
fix(processing): unblock error responses and isolate run-count failures
waleedlatif1 Feb 24, 2026
540e0be
improvement(processing): remove dead setupExecutor method
waleedlatif1 Feb 24, 2026
4fae73b
remove logger.debug
waleedlatif1 Feb 24, 2026
e7b216b
fix(processing): guard completionPromise as write-once (singleton pro…
waleedlatif1 Feb 24, 2026
f093193
improvement(processing): remove empty else/catch blocks left by debug…
waleedlatif1 Feb 24, 2026
8e72e13
fix(processing): enforce waitForCompletion inside markAsFailed to pre…
waleedlatif1 Feb 24, 2026
5a8e611
fix(processing): reset completing flag on fallback failure, clean up …
waleedlatif1 Feb 24, 2026
6004fea
fix(processing): restore disconnect error logging in MCP test-connection
waleedlatif1 Feb 24, 2026
4c78376
fix(processing): address audit findings across branch
waleedlatif1 Feb 24, 2026
dff3f64
revert: undo unnecessary subscription null→undefined change
waleedlatif1 Feb 24, 2026
e1981b6
improvement(processing): remove dead try/catch around getHighestPrior…
waleedlatif1 Feb 24, 2026
2860663
improvement(processing): remove dead getSnapshotByHash method
waleedlatif1 Feb 24, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 30 additions & 20 deletions apps/sim/app/api/workflows/[id]/execute/route.ts
Original file line number Diff line number Diff line change
Expand Up @@ -409,18 +409,6 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
const shouldUseDraftState = isPublicApiAccess
? false
: (useDraftState ?? auth.authType === 'session')
const workflowAuthorization = await authorizeWorkflowByWorkspacePermission({
workflowId,
userId,
action: shouldUseDraftState ? 'write' : 'read',
})
if (!workflowAuthorization.allowed) {
return NextResponse.json(
{ error: workflowAuthorization.message || 'Access denied' },
{ status: workflowAuthorization.status }
)
}

const streamHeader = req.headers.get('X-Stream-Response') === 'true'
const enableSSE = streamHeader || streamParam === true
const executionModeHeader = req.headers.get('X-Execution-Mode')
Expand Down Expand Up @@ -455,6 +443,21 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
const useAuthenticatedUserAsActor =
isClientSession || (auth.authType === 'api_key' && auth.apiKeyType === 'personal')

// Authorization fetches the full workflow record and checks workspace permissions.
// Run it first so we can pass the record to preprocessing (eliminates a duplicate DB query).
const workflowAuthorization = await authorizeWorkflowByWorkspacePermission({
workflowId,
userId,
action: shouldUseDraftState ? 'write' : 'read',
})
if (!workflowAuthorization.allowed) {
return NextResponse.json(
{ error: workflowAuthorization.message || 'Access denied' },
{ status: workflowAuthorization.status }
)
}

// Pass the pre-fetched workflow record to skip the redundant Step 1 DB query in preprocessing.
const preprocessResult = await preprocessExecution({
workflowId,
userId,
Expand All @@ -465,6 +468,7 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
loggingSession,
useDraftState: shouldUseDraftState,
useAuthenticatedUserAsActor,
workflowRecord: workflowAuthorization.workflow ?? undefined,
})

if (!preprocessResult.success) {
Expand Down Expand Up @@ -514,7 +518,7 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
try {
const workflowData = shouldUseDraftState
? await loadWorkflowFromNormalizedTables(workflowId)
: await loadDeployedWorkflowState(workflowId)
: await loadDeployedWorkflowState(workflowId, workspaceId)

if (workflowData) {
const deployedVariables =
Expand Down Expand Up @@ -694,10 +698,18 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:

const executionResult = hasExecutionResult(error) ? error.executionResult : undefined

await loggingSession.safeCompleteWithError({
totalDurationMs: executionResult?.metadata?.duration,
// Fire-and-forget: execution-core.ts already handles logging via its own
// fire-and-forget call. The `completed` guard in LoggingSession prevents
// double-writes, so this is a no-op — but we avoid awaiting it to reduce
// error-response latency.
const { traceSpans, totalDuration } = executionResult
? buildTraceSpans(executionResult)
: { traceSpans: [], totalDuration: 0 }

void loggingSession.safeCompleteWithError({
Comment thread
waleedlatif1 marked this conversation as resolved.
Outdated
Comment thread
waleedlatif1 marked this conversation as resolved.
Outdated
totalDurationMs: totalDuration || executionResult?.metadata?.duration,
error: { message: errorMessage },
traceSpans: executionResult?.logs as any,
traceSpans,
})

return NextResponse.json(
Expand All @@ -718,11 +730,9 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
} finally {
timeoutController.cleanup()
if (executionId) {
try {
await cleanupExecutionBase64Cache(executionId)
} catch (error) {
void cleanupExecutionBase64Cache(executionId).catch((error) => {
logger.error(`[${requestId}] Failed to cleanup base64 cache`, { error })
}
})
Comment thread
waleedlatif1 marked this conversation as resolved.
}
}
}
Expand Down
5 changes: 4 additions & 1 deletion apps/sim/background/schedule-execution.ts
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,10 @@ async function runWorkflowExecution({
}): Promise<RunWorkflowResult> {
try {
logger.debug(`[${requestId}] Loading deployed workflow ${payload.workflowId}`)
const deployedData = await loadDeployedWorkflowState(payload.workflowId)
const deployedData = await loadDeployedWorkflowState(
payload.workflowId,
workflowRecord.workspaceId ?? undefined
)

const blocks = deployedData.blocks
const { deploymentVersionId } = deployedData
Expand Down
6 changes: 1 addition & 5 deletions apps/sim/background/workflow-execution.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import { LoggingSession } from '@/lib/logs/execution/logging-session'
import { buildTraceSpans } from '@/lib/logs/execution/trace-spans/trace-spans'
import { executeWorkflowCore } from '@/lib/workflows/executor/execution-core'
import { PauseResumeManager } from '@/lib/workflows/executor/human-in-the-loop-manager'
import { getWorkflowById } from '@/lib/workflows/utils'
import { ExecutionSnapshot } from '@/executor/execution/snapshot'
import type { ExecutionMetadata } from '@/executor/execution/types'
import { hasExecutionResult } from '@/executor/utils/errors'
Expand Down Expand Up @@ -79,10 +78,7 @@ export async function executeWorkflowJob(payload: WorkflowExecutionPayload) {
variables: {},
})

const workflow = await getWorkflowById(workflowId)
if (!workflow) {
throw new Error(`Workflow ${workflowId} not found after preprocessing`)
}
Comment thread
waleedlatif1 marked this conversation as resolved.
const workflow = preprocessResult.workflowRecord!

const metadata: ExecutionMetadata = {
requestId,
Expand Down
15 changes: 11 additions & 4 deletions apps/sim/lib/billing/calculations/usage-monitor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import { db } from '@sim/db'
import { member, organization, userStats } from '@sim/db/schema'
import { createLogger } from '@sim/logger'
import { and, eq, inArray } from 'drizzle-orm'
import type { HighestPrioritySubscription } from '@/lib/billing/core/plan'
import { getUserUsageLimit } from '@/lib/billing/core/usage'
import { isBillingEnabled } from '@/lib/core/config/feature-flags'

Expand All @@ -21,7 +22,10 @@ interface UsageData {
* Checks a user's cost usage against their subscription plan limit
* and returns usage information including whether they're approaching the limit
*/
export async function checkUsageStatus(userId: string): Promise<UsageData> {
export async function checkUsageStatus(
userId: string,
preloadedSubscription?: HighestPrioritySubscription
): Promise<UsageData> {
try {
// If billing is disabled, always return permissive limits
if (!isBillingEnabled) {
Expand All @@ -42,7 +46,7 @@ export async function checkUsageStatus(userId: string): Promise<UsageData> {
}

// Get usage limit from user_stats (per-user cap)
const limit = await getUserUsageLimit(userId)
const limit = await getUserUsageLimit(userId, preloadedSubscription)
logger.info('Using stored usage limit', { userId, limit })

// Get actual usage from the database
Expand Down Expand Up @@ -228,7 +232,10 @@ export async function checkAndNotifyUsage(userId: string): Promise<void> {
* @param userId The ID of the user to check
* @returns An object containing the exceeded status and usage details
*/
export async function checkServerSideUsageLimits(userId: string): Promise<{
export async function checkServerSideUsageLimits(
userId: string,
preloadedSubscription?: HighestPrioritySubscription
): Promise<{
isExceeded: boolean
currentUsage: number
limit: number
Expand Down Expand Up @@ -314,7 +321,7 @@ export async function checkServerSideUsageLimits(userId: string): Promise<{
}
}

const usageData = await checkUsageStatus(userId)
const usageData = await checkUsageStatus(userId, preloadedSubscription)

return {
isExceeded: usageData.isExceeded,
Expand Down
2 changes: 2 additions & 0 deletions apps/sim/lib/billing/core/plan.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import { checkEnterprisePlan, checkProPlan, checkTeamPlan } from '@/lib/billing/

const logger = createLogger('PlanLookup')

export type HighestPrioritySubscription = Awaited<ReturnType<typeof getHighestPrioritySubscription>>

/**
* Get the highest priority active subscription for a user
* Priority: Enterprise > Team > Pro > Free
Expand Down
10 changes: 8 additions & 2 deletions apps/sim/lib/billing/core/usage.ts
Original file line number Diff line number Diff line change
Expand Up @@ -352,8 +352,14 @@ export async function updateUserUsageLimit(
* Free/Pro: Individual user limit from userStats
* Team/Enterprise: Organization limit
*/
export async function getUserUsageLimit(userId: string): Promise<number> {
const subscription = await getHighestPrioritySubscription(userId)
export async function getUserUsageLimit(
userId: string,
preloadedSubscription?: Awaited<ReturnType<typeof getHighestPrioritySubscription>>
Comment thread
waleedlatif1 marked this conversation as resolved.
Outdated
): Promise<number> {
const subscription =
preloadedSubscription !== undefined
? preloadedSubscription
: await getHighestPrioritySubscription(userId)
Comment thread
waleedlatif1 marked this conversation as resolved.

if (!subscription || subscription.plan === 'free' || subscription.plan === 'pro') {
// Free/Pro: Use individual limit from userStats
Expand Down
1 change: 0 additions & 1 deletion apps/sim/lib/billing/organizations/membership.ts
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,6 @@ export async function restoreUserProSubscription(userId: string): Promise<Restor
.where(eq(subscriptionTable.id, personalPro.id))

result.restored = true

logger.info('Restored personal Pro subscription', {
userId,
subscriptionId: personalPro.id,
Expand Down
Loading