From 59c0c32f758ec88660df762ecbfe3ed2934a916b Mon Sep 17 00:00:00 2001 From: Artem Niehrieiev Date: Thu, 4 Sep 2025 07:49:26 +0000 Subject: [PATCH] refactor: enhance email processing efficiency by adjusting batch sizes and improving error handling --- .../entities/cron-jobs/cron-jobs.service.ts | 110 +++++++++++---- .../src/entities/email/email/email.service.ts | 23 ++- ...sers-actions-and-mailing-users.use.case.ts | 131 +++++++++++------- 3 files changed, 182 insertions(+), 82 deletions(-) diff --git a/backend/src/entities/cron-jobs/cron-jobs.service.ts b/backend/src/entities/cron-jobs/cron-jobs.service.ts index 3640b5e94..844b82b1f 100644 --- a/backend/src/entities/cron-jobs/cron-jobs.service.ts +++ b/backend/src/entities/cron-jobs/cron-jobs.service.ts @@ -46,34 +46,76 @@ export class CronJobsService { try { const isJobAdded = await this.insertMorningJob(); if (!isJobAdded) { + console.log('Job already in progress, exiting'); return; } await slackPostMessage(`email cron started at ${this.getCurrentTime()}`, Constants.EXCEPTIONS_CHANNELS); - let emails = await this.checkUsersActionsAndMailingUsersUseCase.execute(); - emails = emails.filter((email) => { - if (email && /^demo_.*@rocketadmin\.com$/i.test(email)) { - return false; + try { + let emails = await this.checkUsersActionsAndMailingUsersUseCase.execute(); + console.log(`Retrieved ${emails.length} email addresses from use case`); + await slackPostMessage( + `Successfully retrieved ${emails.length} email addresses from use case`, + Constants.EXCEPTIONS_CHANNELS, + ); + + const emailsBefore = emails.length; + emails = emails.filter((email) => { + return ValidationHelper.isValidEmail(email); + }); + console.log(`Filtered out ${emailsBefore - emails.length} invalid or demo emails`); + + await slackPostMessage( + `Found ${emails.length} valid emails. starting messaging`, + Constants.EXCEPTIONS_CHANNELS, + ); + const batchSize = 10; + let allMailingResults = []; + + for (let i = 0; i < emails.length; i += batchSize) { + const emailsBatch = emails.slice(i, i + batchSize); + console.log( + `Processing email batch ${Math.floor(i / batchSize) + 1} of ${Math.ceil(emails.length / batchSize)}, with ${emailsBatch.length} emails`, + ); + + try { + const batchResults = await this.emailService.sendRemindersToUsers(emailsBatch); + console.log(`Batch ${Math.floor(i / batchSize) + 1} completed with ${batchResults.length} results`); + allMailingResults = [...allMailingResults, ...batchResults]; + } catch (error) { + console.error(`Error processing batch ${Math.floor(i / batchSize) + 1}: ${error.message}`); + Sentry.captureException(error); + } + await new Promise((resolve) => setTimeout(resolve, 1000)); } - return ValidationHelper.isValidEmail(email); - }); - await slackPostMessage(`found ${emails.length} valid emails. starting messaging`, Constants.EXCEPTIONS_CHANNELS); - const mailingResults = await this.emailService.sendRemindersToUsers(emails); - if (mailingResults.length === 0) { - const mailingResultToString = 'Sending emails triggered, but no emails sent (no users found)'; - await slackPostMessage(mailingResultToString, Constants.EXCEPTIONS_CHANNELS); - await slackPostMessage(`morning cron finished at ${this.getCurrentTime()}`, Constants.EXCEPTIONS_CHANNELS); - } else { - await slackPostMessage(JSON.stringify(mailingResults), Constants.EXCEPTIONS_CHANNELS); - // await this.sendEmailResultsToSlack(mailingResults, emails); - await slackPostMessage(`morning cron finished at ${this.getCurrentTime()}`, Constants.EXCEPTIONS_CHANNELS); + console.log(`Email sending completed. Total results: ${allMailingResults.length}`); + + if (allMailingResults.length === 0) { + const mailingResultToString = 'Sending emails triggered, but no emails sent (no users found)'; + await slackPostMessage(mailingResultToString, Constants.EXCEPTIONS_CHANNELS); + await slackPostMessage(`morning cron finished at ${this.getCurrentTime()}`, Constants.EXCEPTIONS_CHANNELS); + } else { + await slackPostMessage(`Sent ${allMailingResults.length} emails successfully`, Constants.EXCEPTIONS_CHANNELS); + // await this.sendEmailResultsToSlack(allMailingResults, emails); + await slackPostMessage(`morning cron finished at ${this.getCurrentTime()}`, Constants.EXCEPTIONS_CHANNELS); + } + } catch (innerError) { + console.error('Detailed error in email processing:', innerError); + const errorMessage = innerError.stack + ? `${innerError.message}\n${innerError.stack.split('\n').slice(0, 5).join('\n')}` + : innerError.message; + await slackPostMessage(`Error in email processing: ${errorMessage}`, Constants.EXCEPTIONS_CHANNELS); + Sentry.captureException(innerError); } } catch (e) { - await slackPostMessage(`Error in morning cron: ${e.message}`, Constants.EXCEPTIONS_CHANNELS); + console.error('Main cron handler error:', e); + const errorMessage = e.stack ? `${e.message}\n${e.stack.split('\n').slice(0, 5).join('\n')}` : e.message; + await slackPostMessage(`Error in morning cron: ${errorMessage}`, Constants.EXCEPTIONS_CHANNELS); Sentry.captureException(e); - console.error(e); + } finally { + console.log('Cron job completed at', new Date().toISOString()); } } @@ -126,9 +168,16 @@ export class CronJobsService { const filteredResults = results.filter((result) => !!result); const nullResultsCount = results.length - filteredResults.length; const chunkSize = 20; - const emailsNonFoundInResults = allFoundEmails.filter( - (email) => !filteredResults.some((result) => result?.accepted?.includes(email)), - ); + + const foundEmails = new Set(); + filteredResults.forEach((result) => { + if (result?.accepted) { + result.accepted.forEach((email) => foundEmails.add(email)); + } + }); + + const emailsNonFoundInResults = allFoundEmails.filter((email) => !foundEmails.has(email)); + for (let i = 0; i < filteredResults.length; i += chunkSize) { const chunk = filteredResults.slice(i, i + chunkSize); const message = this.emailCronResultToSlackString(chunk); @@ -136,12 +185,25 @@ export class CronJobsService { continue; } await slackPostMessage(message, Constants.EXCEPTIONS_CHANNELS); + await new Promise((resolve) => setTimeout(resolve, 100)); } + if (nullResultsCount > 0) { const timedOutMessage = `The system timed out while sending results to ${nullResultsCount} email addresses`; - const timedOutEmailsMessage = `: \n${emailsNonFoundInResults.join(', ')}\n`; - const fullTimedOutMessage = `${timedOutMessage}${timedOutEmailsMessage}`; - await slackPostMessage(fullTimedOutMessage, Constants.EXCEPTIONS_CHANNELS); + if (emailsNonFoundInResults.length > 100) { + await slackPostMessage(timedOutMessage, Constants.EXCEPTIONS_CHANNELS); + for (let i = 0; i < emailsNonFoundInResults.length; i += 100) { + const emailsChunk = emailsNonFoundInResults.slice(i, i + 100); + await slackPostMessage( + `Failed emails (chunk ${i / 100 + 1}): ${emailsChunk.join(', ')}`, + Constants.EXCEPTIONS_CHANNELS, + ); + } + } else { + const timedOutEmailsMessage = `: \n${emailsNonFoundInResults.join(', ')}\n`; + const fullTimedOutMessage = `${timedOutMessage}${timedOutEmailsMessage}`; + await slackPostMessage(fullTimedOutMessage, Constants.EXCEPTIONS_CHANNELS); + } } } diff --git a/backend/src/entities/email/email/email.service.ts b/backend/src/entities/email/email/email.service.ts index f75dd7edd..02154bc68 100644 --- a/backend/src/entities/email/email/email.service.ts +++ b/backend/src/entities/email/email/email.service.ts @@ -80,14 +80,25 @@ export class EmailService { } public async sendRemindersToUsers(userEmails: Array): Promise> { - const queue = new PQueue({ concurrency: 8 }); - const mailingResults: Array = await Promise.all( - userEmails.map(async (email: string) => { - return await queue.add(async () => { + const queue = new PQueue({ concurrency: 3 }); + + const mailingResults: Array = []; + + for (const email of userEmails) { + try { + const result = await queue.add(async () => { return await this.sendReminderToUser(email); }); - }), - ); + mailingResults.push(result); + } catch (error) { + this.logger.error(`Failed to send reminder to ${email}: ${error.message}`); + Sentry.captureException(error); + mailingResults.push(null); + } + } + + await queue.onIdle(); + return this.buildMailingResults(mailingResults); } diff --git a/backend/src/entities/user-actions/use-cases/check-users-actions-and-mailing-users.use.case.ts b/backend/src/entities/user-actions/use-cases/check-users-actions-and-mailing-users.use.case.ts index b9ae5bd9a..a42c203d2 100644 --- a/backend/src/entities/user-actions/use-cases/check-users-actions-and-mailing-users.use.case.ts +++ b/backend/src/entities/user-actions/use-cases/check-users-actions-and-mailing-users.use.case.ts @@ -21,34 +21,34 @@ export class CheckUsersActionsAndMailingUsersUseCase implements ICheckUsersActio public async execute(): Promise> { try { const distinctUsers = await this.findDistinctUsersForProcessing(); - const batchSize = 2; - const queue = new PQueue({ concurrency: 3 }); + const batchSize = 10; const emails: string[] = []; - - try { - for (let i = 0; i < distinctUsers.length; i += batchSize) { - const batch = distinctUsers.slice(i, i + batchSize); - await Promise.all( - batch.map((user) => - queue.add(async () => { - await this.updateOrCreateActionForUser(user); - if (user.email) { - emails.push(user.email.toLowerCase()); - } - }), - ), - ); + const queue = new PQueue({ concurrency: 2 }); + for (let i = 0; i < distinctUsers.length; i += batchSize) { + const batch = distinctUsers.slice(i, i + batchSize); + for (const user of batch) { + try { + await queue.add(async () => { + await this.updateOrCreateActionForUser(user); + if (user.email) { + emails.push(user.email.toLowerCase()); + } + }); + } catch (error) { + console.error(`Error processing user ${user.id}: ${error.message}`); + Sentry.captureException(error); + } } - await queue.onIdle(); - } finally { - queue.clear(); + if (i + batchSize < distinctUsers.length) { + await new Promise((resolve) => setTimeout(resolve, 500)); + } } - + queue.clear(); return getUniqArrayStrings(emails); } catch (e) { + console.error('Error in CheckUsersActionsAndMailingUsersUseCase.execute():', e); Sentry.captureException(e); - console.error(e); return []; } } @@ -78,38 +78,65 @@ export class CheckUsersActionsAndMailingUsersUseCase implements ICheckUsersActio } private async findDistinctUsersForProcessing(): Promise> { - const nonFinishedActionsQuery = this.userActionRepository - .createQueryBuilder('user_action') - .select(['user.id as id', 'user.email as email']) - .innerJoin('user_action.user', 'user') - .where('user.isDemoAccount = :isDemoAccount', { isDemoAccount: false }) - .andWhere('user_action.createdAt <= :date_to', { date_to: Constants.ONE_WEEK_AGO() }) - .andWhere('user_action.mail_sent = :mail_sent', { mail_sent: false }) - .andWhere('user_action.message = :message', { message: UserActionEnum.CONNECTION_CREATION_NOT_FINISHED }); - - const usersWithoutLogsQuery = this.userRepository - .createQueryBuilder('user') - .select(['user.id as id', 'user.email as email']) - .leftJoin('user.groups', 'group') - .leftJoin('group.connection', 'connection') - .leftJoin('connection.logs', 'tableLogs') - .leftJoin('user.user_action', 'user_action') - .where('user.isDemoAccount = :isDemoAccount', { isDemoAccount: false }) - .andWhere('(user_action.mail_sent = :mail_sent OR user_action.id is null)', { mail_sent: false }) - .andWhere('tableLogs.id is null'); - - const nonFinishedUsersResult = await nonFinishedActionsQuery.getRawMany(); - const usersWithoutLogsResult = await usersWithoutLogsQuery.getRawMany(); - - const combinedUsers = [...nonFinishedUsersResult, ...usersWithoutLogsResult]; - const uniqueUsersMap = new Map(); - - combinedUsers.forEach((user) => { - if (!uniqueUsersMap.has(user.id)) { - uniqueUsersMap.set(user.id, { id: user.id, email: user.email }); + try { + const nonFinishedActionsQuery = this.userActionRepository + .createQueryBuilder('user_action') + .select(['user.id as id', 'user.email as email']) + .innerJoin('user_action.user', 'user') + .where('user.isDemoAccount = :isDemoAccount', { isDemoAccount: false }) + .andWhere('user_action.createdAt <= :date_to', { date_to: Constants.ONE_WEEK_AGO() }) + .andWhere('user_action.mail_sent = :mail_sent', { mail_sent: false }) + .andWhere('user_action.message = :message', { message: UserActionEnum.CONNECTION_CREATION_NOT_FINISHED }); + const pageSize = 100; + let page = 0; + let hasMore = true; + const uniqueUsersMap = new Map(); + while (hasMore) { + const paginatedQuery = nonFinishedActionsQuery.skip(page * pageSize).take(pageSize); + const batch = await paginatedQuery.getRawMany(); + if (batch.length === 0) { + hasMore = false; + } else { + batch.forEach((user) => { + if (!uniqueUsersMap.has(user.id)) { + uniqueUsersMap.set(user.id, { id: user.id, email: user.email }); + } + }); + page++; + } + } + page = 0; + hasMore = true; + const usersWithoutLogsQuery = this.userRepository + .createQueryBuilder('user') + .select(['user.id as id', 'user.email as email']) + .leftJoin('user.groups', 'group') + .leftJoin('group.connection', 'connection') + .leftJoin('connection.logs', 'tableLogs') + .leftJoin('user.user_action', 'user_action') + .where('user.isDemoAccount = :isDemoAccount', { isDemoAccount: false }) + .andWhere('(user_action.mail_sent = :mail_sent OR user_action.id is null)', { mail_sent: false }) + .andWhere('tableLogs.id is null'); + while (hasMore) { + const paginatedQuery = usersWithoutLogsQuery.skip(page * pageSize).take(pageSize); + const batch = await paginatedQuery.getRawMany(); + if (batch.length === 0) { + hasMore = false; + } else { + batch.forEach((user) => { + if (!uniqueUsersMap.has(user.id)) { + uniqueUsersMap.set(user.id, { id: user.id, email: user.email }); + } + }); + page++; + } } - }); - return Array.from(uniqueUsersMap.values()); + return Array.from(uniqueUsersMap.values()); + } catch (error) { + console.error('Error in findDistinctUsersForProcessing:', error); + Sentry.captureException(error); + return []; + } } }