@@ -347,12 +347,6 @@ async function fetchAndProcessAirtablePayloads(
347347 const consolidatedChangesMap = new Map<string, AirtableChange>()
348348 let localProviderConfig = { ...((webhookData.providerConfig as Record<string, any>) || {}) } // Local copy
349349
350- // Add log at the very beginning of the try block
351- logger.info(`[${requestId}] Entering fetchAndProcessAirtablePayloads try block`, {
352- webhookId: webhookData.id,
353- workflowId: workflowData.id,
354- })
355-
356350 try {
357351 // --- Essential IDs & Config from localProviderConfig ---
358352 const baseId = localProviderConfig.baseId
@@ -373,28 +367,14 @@ async function fetchAndProcessAirtablePayloads(
373367
374368 // --- Retrieve Stored Cursor from localProviderConfig ---
375369 const storedCursor = localProviderConfig.externalWebhookCursor
376- logger.info(`[${requestId}] Retrieved providerConfig for cursor check`, {
377- webhookId: webhookData.id,
378- storedCursorValue: storedCursor,
379- type: typeof storedCursor,
380- })
381370
382- // IMPORTANT FIX: Initialize cursor in provider config if missing
371+ // Initialize cursor in provider config if missing
383372 if (storedCursor === undefined || storedCursor === null) {
384- logger.info(`[${requestId}] Cursor is missing in providerConfig, initializing it`, {
385- webhookId: webhookData.id,
386- })
387-
388373 // Update the local copy
389374 localProviderConfig.externalWebhookCursor = null
390375
391376 // Add cursor to the database immediately to fix the configuration
392377 try {
393- // Add log before the DB update
394- logger.info(`[${requestId}] Attempting to initialize cursor in DB`, {
395- webhookId: webhookData.id,
396- configToSave: { ...localProviderConfig, externalWebhookCursor: null },
397- })
398378 await db
399379 .update(webhook)
400380 .set({
@@ -404,9 +384,6 @@ async function fetchAndProcessAirtablePayloads(
404384 .where(eq(webhook.id, webhookData.id))
405385
406386 localProviderConfig.externalWebhookCursor = null // Update local copy too
407- logger.info(`[${requestId}] Successfully initialized cursor in DB`, {
408- webhookId: webhookData.id,
409- })
410387 } catch (initError: any) {
411388 logger.error(`[${requestId}] Failed to initialize cursor in DB`, {
412389 webhookId: webhookData.id,
@@ -425,22 +402,11 @@ async function fetchAndProcessAirtablePayloads(
425402
426403 if (storedCursor && typeof storedCursor === 'number') {
427404 currentCursor = storedCursor
428- logger.info(`[${requestId}] Using stored cursor`, {
429- webhookId: webhookData.id,
430- cursor: currentCursor,
431- })
432405 } else {
433- logger.info(`[${requestId}] No valid stored cursor found, starting poll`, {
434- webhookId: webhookData.id,
435- })
436406 currentCursor = null // Airtable API defaults to 1 if omitted
437407 }
438408
439409 // --- Get OAuth Token ---
440- logger.info(`[${requestId}] Attempting to get OAuth token`, {
441- userId: workflowData.userId,
442- provider: 'airtable',
443- })
444410 let accessToken: string | null = null
445411 try {
446412 accessToken = await getOAuthToken(workflowData.userId, 'airtable')
@@ -451,9 +417,6 @@ async function fetchAndProcessAirtablePayloads(
451417 )
452418 throw new Error('Airtable access token not found.')
453419 }
454- logger.info(`[${requestId}] Successfully obtained Airtable access token`, {
455- userId: workflowData.userId,
456- })
457420 } catch (tokenError: any) {
458421 logger.error(
459422 `[${requestId}] Failed to get Airtable OAuth token for user ${workflowData.userId}`,
@@ -489,10 +452,6 @@ async function fetchAndProcessAirtablePayloads(
489452 queryParams.set('cursor', currentCursor.toString())
490453 }
491454 const fullUrl = `${apiUrl}?${queryParams.toString()}`
492- logger.info(`[${requestId}] Calling Airtable GET /payloads (Call ${apiCallCount})`, {
493- url: apiUrl,
494- cursor: currentCursor,
495- })
496455
497456 try {
498457 const response = await fetch(fullUrl, {
@@ -521,14 +480,6 @@ async function fetchAndProcessAirtablePayloads(
521480 }
522481
523482 const receivedPayloads = responseBody.payloads || []
524- logger.info(
525- `[${requestId}] Received response from Airtable /payloads (Call ${apiCallCount})`,
526- {
527- payloadCount: receivedPayloads.length,
528- mightHaveMore: responseBody.mightHaveMore,
529- nextCursor: responseBody.cursor,
530- }
531- )
532483
533484 // --- Process and Consolidate Changes ---
534485 if (receivedPayloads.length > 0) {
@@ -609,20 +560,9 @@ async function fetchAndProcessAirtablePayloads(
609560 mightHaveMore = responseBody.mightHaveMore || false
610561
611562 if (nextCursor && typeof nextCursor === 'number' && nextCursor !== currentCursor) {
612- logger.info(`[${requestId}] Updating cursor for next potential iteration`, {
613- webhookId: webhookData.id,
614- previousCursor: currentCursor,
615- newCursor: nextCursor,
616- mightHaveMore,
617- })
618563 currentCursor = nextCursor
619564 // --- Add logging before and after DB update ---
620565 const updatedConfig = { ...localProviderConfig, externalWebhookCursor: currentCursor }
621- logger.info(`[${requestId}] Attempting to persist new cursor to DB`, {
622- webhookId: webhookData.id,
623- cursor: currentCursor,
624- configToSave: updatedConfig, // Log the object being saved
625- })
626566 try {
627567 // Force a complete object update to ensure consistency in serverless env
628568 await db
@@ -634,10 +574,6 @@ async function fetchAndProcessAirtablePayloads(
634574 .where(eq(webhook.id, webhookData.id))
635575
636576 localProviderConfig.externalWebhookCursor = currentCursor // Update local copy too
637- logger.info(`[${requestId}] Successfully persisted new cursor to DB`, {
638- webhookId: webhookData.id,
639- newCursor: currentCursor,
640- })
641577 } catch (dbError: any) {
642578 logger.error(`[${requestId}] Failed to persist Airtable cursor to DB`, {
643579 webhookId: webhookData.id,
@@ -661,10 +597,6 @@ async function fetchAndProcessAirtablePayloads(
661597 })
662598 mightHaveMore = false
663599 } else {
664- logger.info(
665- `[${requestId}] Cursor unchanged or no new cursor, ending payload fetch loop`,
666- { webhookId: webhookData.id, finalCursor: currentCursor, apiCall: apiCallCount }
667- )
668600 mightHaveMore = false // Explicitly stop if cursor hasn't changed
669601 }
670602 } catch (fetchError: any) {
@@ -687,21 +619,8 @@ async function fetchAndProcessAirtablePayloads(
687619 // Convert map values to array for final processing
688620 const finalConsolidatedChanges = Array.from(consolidatedChangesMap.values())
689621
690- logger.info(`[${requestId}] Finished polling Airtable`, {
691- webhookId: webhookData.id,
692- totalPayloadsFetched: payloadsFetched,
693- consolidatedCount: finalConsolidatedChanges.length,
694- finalCursor: currentCursor,
695- })
696-
697622 // --- Execute Workflow if we have changes (simplified - no lock check) ---
698623 if (finalConsolidatedChanges.length > 0) {
699- logger.info(
700- `[${requestId}] Triggering workflow execution with ${finalConsolidatedChanges.length} changes`,
701- {
702- webhookId: webhookData.id,
703- }
704- )
705624 try {
706625 // Format the input for the executor using the consolidated changes
707626 const input = { airtableChanges: finalConsolidatedChanges } // Use the consolidated array
@@ -714,10 +633,6 @@ async function fetchAndProcessAirtablePayloads(
714633 executionError
715634 )
716635 }
717- } else {
718- logger.info(`[${requestId}] No new changes collected from Airtable, workflow not executed`, {
719- webhookId: webhookData.id,
720- })
721636 }
722637 } catch (error) {
723638 // Catch any unexpected errors during the setup/polling logic itself
0 commit comments