diff --git a/apps/sim/app/api/webhooks/route.ts b/apps/sim/app/api/webhooks/route.ts index 4d5508a1256..86b5962a772 100644 --- a/apps/sim/app/api/webhooks/route.ts +++ b/apps/sim/app/api/webhooks/route.ts @@ -367,9 +367,7 @@ export async function POST(request: NextRequest) { ) } - // Configure each new webhook (for providers that need configuration) - const pollingProviders = ['gmail', 'outlook'] - const needsConfiguration = pollingProviders.includes(provider) + const needsConfiguration = provider === 'gmail' || provider === 'outlook' if (needsConfiguration) { const configureFunc = diff --git a/apps/sim/lib/core/async-jobs/config.ts b/apps/sim/lib/core/async-jobs/config.ts index 6d5e020eb84..0537a6a8ef9 100644 --- a/apps/sim/lib/core/async-jobs/config.ts +++ b/apps/sim/lib/core/async-jobs/config.ts @@ -7,6 +7,7 @@ const logger = createLogger('AsyncJobsConfig') let cachedBackend: JobQueueBackend | null = null let cachedBackendType: AsyncBackendType | null = null +let cachedInlineBackend: JobQueueBackend | null = null /** * Determines which async backend to use based on environment configuration. @@ -71,6 +72,31 @@ export function getCurrentBackendType(): AsyncBackendType | null { return cachedBackendType } +/** + * Gets a job queue backend that bypasses Trigger.dev (Redis -> Database). + * Used for non-polling webhooks that should always execute inline. + */ +export async function getInlineJobQueue(): Promise { + if (cachedInlineBackend) { + return cachedInlineBackend + } + + const redis = getRedisClient() + let type: string + if (redis) { + const { RedisJobQueue } = await import('@/lib/core/async-jobs/backends/redis') + cachedInlineBackend = new RedisJobQueue(redis) + type = 'redis' + } else { + const { DatabaseJobQueue } = await import('@/lib/core/async-jobs/backends/database') + cachedInlineBackend = new DatabaseJobQueue() + type = 'database' + } + + logger.info(`Inline job backend initialized: ${type}`) + return cachedInlineBackend +} + /** * Checks if jobs should be executed inline (fire-and-forget). * For Redis/DB backends, we execute inline. Trigger.dev handles execution itself. @@ -85,4 +111,5 @@ export function shouldExecuteInline(): boolean { export function resetJobQueueCache(): void { cachedBackend = null cachedBackendType = null + cachedInlineBackend = null } diff --git a/apps/sim/lib/core/async-jobs/index.ts b/apps/sim/lib/core/async-jobs/index.ts index 33bb6883029..24e6f1e526f 100644 --- a/apps/sim/lib/core/async-jobs/index.ts +++ b/apps/sim/lib/core/async-jobs/index.ts @@ -1,6 +1,7 @@ export { getAsyncBackendType, getCurrentBackendType, + getInlineJobQueue, getJobQueue, resetJobQueueCache, shouldExecuteInline, diff --git a/apps/sim/lib/webhooks/processor.ts b/apps/sim/lib/webhooks/processor.ts index 07c5a2881b1..df45c1f981e 100644 --- a/apps/sim/lib/webhooks/processor.ts +++ b/apps/sim/lib/webhooks/processor.ts @@ -5,7 +5,7 @@ import { and, eq, isNull, or } from 'drizzle-orm' import { type NextRequest, NextResponse } from 'next/server' import { v4 as uuidv4 } from 'uuid' import { checkEnterprisePlan, checkTeamPlan } from '@/lib/billing/subscriptions/utils' -import { getJobQueue, shouldExecuteInline } from '@/lib/core/async-jobs' +import { getInlineJobQueue, getJobQueue, shouldExecuteInline } from '@/lib/core/async-jobs' import { isProd } from '@/lib/core/config/feature-flags' import { safeCompare } from '@/lib/core/security/encryption' import { getEffectiveDecryptedEnv } from '@/lib/environment/utils' @@ -29,6 +29,7 @@ import { import { executeWebhookJob } from '@/background/webhook-execution' import { resolveEnvVarReferences } from '@/executor/utils/reference-validation' import { isConfluencePayloadMatch } from '@/triggers/confluence/utils' +import { isPollingWebhookProvider } from '@/triggers/constants' import { isGitHubEventMatch } from '@/triggers/github/utils' import { isHubSpotContactEventMatch } from '@/triggers/hubspot/utils' import { isJiraEventMatch } from '@/triggers/jira/utils' @@ -1116,15 +1117,24 @@ export async function queueWebhookExecution( ...(credentialId ? { credentialId } : {}), } - const jobQueue = await getJobQueue() - const jobId = await jobQueue.enqueue('webhook-execution', payload, { - metadata: { workflowId: foundWorkflow.id, userId: actorUserId }, - }) - logger.info( - `[${options.requestId}] Queued webhook execution task ${jobId} for ${foundWebhook.provider} webhook` - ) + const isPolling = isPollingWebhookProvider(payload.provider) - if (shouldExecuteInline()) { + if (isPolling && !shouldExecuteInline()) { + const jobQueue = await getJobQueue() + const jobId = await jobQueue.enqueue('webhook-execution', payload, { + metadata: { workflowId: foundWorkflow.id, userId: actorUserId }, + }) + logger.info( + `[${options.requestId}] Queued polling webhook execution task ${jobId} for ${foundWebhook.provider} webhook via job queue` + ) + } else { + const jobQueue = await getInlineJobQueue() + const jobId = await jobQueue.enqueue('webhook-execution', payload, { + metadata: { workflowId: foundWorkflow.id, userId: actorUserId }, + }) + logger.info( + `[${options.requestId}] Executing ${foundWebhook.provider} webhook ${jobId} inline` + ) void (async () => { try { await jobQueue.startJob(jobId) diff --git a/apps/sim/lib/webhooks/utils.server.ts b/apps/sim/lib/webhooks/utils.server.ts index 8c8e15381ec..76068e451fe 100644 --- a/apps/sim/lib/webhooks/utils.server.ts +++ b/apps/sim/lib/webhooks/utils.server.ts @@ -19,6 +19,7 @@ import { refreshAccessTokenIfNeeded, resolveOAuthAccountId, } from '@/app/api/auth/oauth/utils' +import { isPollingWebhookProvider } from '@/triggers/constants' const logger = createLogger('WebhookUtils') @@ -2222,10 +2223,7 @@ export async function syncWebhooksForCredentialSet(params: { `[${requestId}] Syncing webhooks for credential set ${credentialSetId}, provider ${provider}` ) - // Polling providers get unique paths per credential (for independent state) - // External webhook providers share the same path (external service sends to one URL) - const pollingProviders = ['gmail', 'outlook', 'rss', 'imap'] - const useUniquePaths = pollingProviders.includes(provider) + const useUniquePaths = isPollingWebhookProvider(provider) const credentials = await getCredentialsForCredentialSet(credentialSetId, oauthProviderId) diff --git a/apps/sim/triggers/constants.test.ts b/apps/sim/triggers/constants.test.ts new file mode 100644 index 00000000000..6de36a36a3c --- /dev/null +++ b/apps/sim/triggers/constants.test.ts @@ -0,0 +1,41 @@ +/** + * @vitest-environment node + */ +import { describe, expect, it } from 'vitest' +import { POLLING_PROVIDERS } from '@/triggers/constants' +import { TRIGGER_REGISTRY } from '@/triggers/registry' + +describe('POLLING_PROVIDERS sync with TriggerConfig.polling', () => { + it('matches every trigger with polling: true in the registry', () => { + const registryPollingProviders = new Set( + Object.values(TRIGGER_REGISTRY) + .filter((t) => t.polling === true) + .map((t) => t.provider) + ) + + expect(POLLING_PROVIDERS).toEqual(registryPollingProviders) + }) + + it('no trigger with polling: true is missing from POLLING_PROVIDERS', () => { + const missing: string[] = [] + for (const trigger of Object.values(TRIGGER_REGISTRY)) { + if (trigger.polling && !POLLING_PROVIDERS.has(trigger.provider)) { + missing.push(`${trigger.id} (provider: ${trigger.provider})`) + } + } + expect(missing, `Triggers with polling: true missing from POLLING_PROVIDERS`).toEqual([]) + }) + + it('no POLLING_PROVIDERS entry lacks a polling: true trigger in the registry', () => { + const extra: string[] = [] + for (const provider of POLLING_PROVIDERS) { + const hasTrigger = Object.values(TRIGGER_REGISTRY).some( + (t) => t.provider === provider && t.polling === true + ) + if (!hasTrigger) { + extra.push(provider) + } + } + expect(extra, `POLLING_PROVIDERS entries with no matching polling trigger`).toEqual([]) + }) +}) diff --git a/apps/sim/triggers/constants.ts b/apps/sim/triggers/constants.ts index d7fcdc997b3..feff397f4cf 100644 --- a/apps/sim/triggers/constants.ts +++ b/apps/sim/triggers/constants.ts @@ -35,3 +35,15 @@ export const TRIGGER_RUNTIME_SUBBLOCK_IDS: string[] = [ * This prevents runaway errors from continuously executing failing workflows. */ export const MAX_CONSECUTIVE_FAILURES = 100 + +/** + * Set of webhook provider names that use polling-based triggers. + * Mirrors the `polling: true` flag on TriggerConfig entries. + * Used to route execution: polling providers use the full job queue + * (Trigger.dev), non-polling providers execute inline. + */ +export const POLLING_PROVIDERS = new Set(['gmail', 'outlook', 'rss', 'imap']) + +export function isPollingWebhookProvider(provider: string): boolean { + return POLLING_PROVIDERS.has(provider) +} diff --git a/apps/sim/triggers/gmail/poller.ts b/apps/sim/triggers/gmail/poller.ts index ee8a8c94718..ada550c5f34 100644 --- a/apps/sim/triggers/gmail/poller.ts +++ b/apps/sim/triggers/gmail/poller.ts @@ -30,6 +30,7 @@ export const gmailPollingTrigger: TriggerConfig = { description: 'Triggers when new emails are received in Gmail (requires Gmail credentials)', version: '1.0.0', icon: GmailIcon, + polling: true, subBlocks: [ { diff --git a/apps/sim/triggers/imap/poller.ts b/apps/sim/triggers/imap/poller.ts index cfcc5c5d724..b7a8063e1ab 100644 --- a/apps/sim/triggers/imap/poller.ts +++ b/apps/sim/triggers/imap/poller.ts @@ -12,6 +12,7 @@ export const imapPollingTrigger: TriggerConfig = { description: 'Triggers when new emails are received via IMAP (works with any email provider)', version: '1.0.0', icon: MailServerIcon, + polling: true, subBlocks: [ // Connection settings diff --git a/apps/sim/triggers/outlook/poller.ts b/apps/sim/triggers/outlook/poller.ts index 9f3d9b09b17..bd22d2d13bc 100644 --- a/apps/sim/triggers/outlook/poller.ts +++ b/apps/sim/triggers/outlook/poller.ts @@ -24,6 +24,7 @@ export const outlookPollingTrigger: TriggerConfig = { description: 'Triggers when new emails are received in Outlook (requires Microsoft credentials)', version: '1.0.0', icon: OutlookIcon, + polling: true, subBlocks: [ { diff --git a/apps/sim/triggers/rss/poller.ts b/apps/sim/triggers/rss/poller.ts index 8d295d47580..0877ee06356 100644 --- a/apps/sim/triggers/rss/poller.ts +++ b/apps/sim/triggers/rss/poller.ts @@ -8,6 +8,7 @@ export const rssPollingTrigger: TriggerConfig = { description: 'Triggers when new items are published to an RSS feed', version: '1.0.0', icon: RssIcon, + polling: true, subBlocks: [ { diff --git a/apps/sim/triggers/types.ts b/apps/sim/triggers/types.ts index 3696c4597b2..69e5a5d2fbd 100644 --- a/apps/sim/triggers/types.ts +++ b/apps/sim/triggers/types.ts @@ -25,6 +25,9 @@ export interface TriggerConfig { method?: 'POST' | 'GET' | 'PUT' | 'DELETE' headers?: Record } + + /** When true, this trigger is poll-based (cron-driven) rather than push-based. */ + polling?: boolean } export interface TriggerRegistry {