diff --git a/backend/src/entities/cron-jobs/cron-jobs.service.ts b/backend/src/entities/cron-jobs/cron-jobs.service.ts index 844b82b1f..a13d3e625 100644 --- a/backend/src/entities/cron-jobs/cron-jobs.service.ts +++ b/backend/src/entities/cron-jobs/cron-jobs.service.ts @@ -50,13 +50,12 @@ export class CronJobsService { return; } - await slackPostMessage(`email cron started at ${this.getCurrentTime()}`, Constants.EXCEPTIONS_CHANNELS); + await slackPostMessage(`Email cron started at ${this.getCurrentTime()}`, Constants.EXCEPTIONS_CHANNELS); try { let emails = await this.checkUsersActionsAndMailingUsersUseCase.execute(); - console.log(`Retrieved ${emails.length} email addresses from use case`); await slackPostMessage( - `Successfully retrieved ${emails.length} email addresses from use case`, + `Retrieved ${emails.length} email addresses from database`, Constants.EXCEPTIONS_CHANNELS, ); @@ -64,24 +63,21 @@ export class CronJobsService { emails = emails.filter((email) => { return ValidationHelper.isValidEmail(email); }); - console.log(`Filtered out ${emailsBefore - emails.length} invalid or demo emails`); + + const filteredOutEmailsCount = emailsBefore - emails.length; await slackPostMessage( - `Found ${emails.length} valid emails. starting messaging`, + `Found ${emails.length} valid emails${filteredOutEmailsCount ? `. Filtered out ${filteredOutEmailsCount} invalid or demo emails` : ``}. Starting messaging`, Constants.EXCEPTIONS_CHANNELS, ); + const batchSize = 10; let allMailingResults = []; for (let i = 0; i < emails.length; i += batchSize) { const emailsBatch = emails.slice(i, i + batchSize); - console.log( - `Processing email batch ${Math.floor(i / batchSize) + 1} of ${Math.ceil(emails.length / batchSize)}, with ${emailsBatch.length} emails`, - ); - try { const batchResults = await this.emailService.sendRemindersToUsers(emailsBatch); - console.log(`Batch ${Math.floor(i / batchSize) + 1} completed with ${batchResults.length} results`); allMailingResults = [...allMailingResults, ...batchResults]; } catch (error) { console.error(`Error processing batch ${Math.floor(i / batchSize) + 1}: ${error.message}`); @@ -90,15 +86,12 @@ export class CronJobsService { await new Promise((resolve) => setTimeout(resolve, 1000)); } - console.log(`Email sending completed. Total results: ${allMailingResults.length}`); - if (allMailingResults.length === 0) { const mailingResultToString = 'Sending emails triggered, but no emails sent (no users found)'; await slackPostMessage(mailingResultToString, Constants.EXCEPTIONS_CHANNELS); await slackPostMessage(`morning cron finished at ${this.getCurrentTime()}`, Constants.EXCEPTIONS_CHANNELS); } else { - await slackPostMessage(`Sent ${allMailingResults.length} emails successfully`, Constants.EXCEPTIONS_CHANNELS); - // await this.sendEmailResultsToSlack(allMailingResults, emails); + await this.sendEmailResultsToSlack(allMailingResults, emails); await slackPostMessage(`morning cron finished at ${this.getCurrentTime()}`, Constants.EXCEPTIONS_CHANNELS); } } catch (innerError) { diff --git a/backend/src/entities/user-actions/use-cases/check-users-actions-and-mailing-users.use.case.ts b/backend/src/entities/user-actions/use-cases/check-users-actions-and-mailing-users.use.case.ts index a42c203d2..9b9a2c774 100644 --- a/backend/src/entities/user-actions/use-cases/check-users-actions-and-mailing-users.use.case.ts +++ b/backend/src/entities/user-actions/use-cases/check-users-actions-and-mailing-users.use.case.ts @@ -23,7 +23,7 @@ export class CheckUsersActionsAndMailingUsersUseCase implements ICheckUsersActio const distinctUsers = await this.findDistinctUsersForProcessing(); const batchSize = 10; const emails: string[] = []; - const queue = new PQueue({ concurrency: 2 }); + const queue = new PQueue({ concurrency: 5 }); for (let i = 0; i < distinctUsers.length; i += batchSize) { const batch = distinctUsers.slice(i, i + batchSize); for (const user of batch) { @@ -81,35 +81,28 @@ export class CheckUsersActionsAndMailingUsersUseCase implements ICheckUsersActio try { const nonFinishedActionsQuery = this.userActionRepository .createQueryBuilder('user_action') - .select(['user.id as id', 'user.email as email']) + .select('DISTINCT user.id as id') + .addSelect('user.email as email') .innerJoin('user_action.user', 'user') .where('user.isDemoAccount = :isDemoAccount', { isDemoAccount: false }) .andWhere('user_action.createdAt <= :date_to', { date_to: Constants.ONE_WEEK_AGO() }) .andWhere('user_action.mail_sent = :mail_sent', { mail_sent: false }) .andWhere('user_action.message = :message', { message: UserActionEnum.CONNECTION_CREATION_NOT_FINISHED }); - const pageSize = 100; - let page = 0; - let hasMore = true; + const uniqueUsersMap = new Map(); - while (hasMore) { - const paginatedQuery = nonFinishedActionsQuery.skip(page * pageSize).take(pageSize); - const batch = await paginatedQuery.getRawMany(); - if (batch.length === 0) { - hasMore = false; - } else { - batch.forEach((user) => { - if (!uniqueUsersMap.has(user.id)) { - uniqueUsersMap.set(user.id, { id: user.id, email: user.email }); - } - }); - page++; + + const nonFinishedUsers = await nonFinishedActionsQuery.getRawMany(); + + nonFinishedUsers.forEach((user) => { + if (!uniqueUsersMap.has(user.id)) { + uniqueUsersMap.set(user.id, { id: user.id, email: user.email }); } - } - page = 0; - hasMore = true; + }); + const usersWithoutLogsQuery = this.userRepository .createQueryBuilder('user') - .select(['user.id as id', 'user.email as email']) + .select('DISTINCT user.id as id') + .addSelect('user.email as email') .leftJoin('user.groups', 'group') .leftJoin('group.connection', 'connection') .leftJoin('connection.logs', 'tableLogs') @@ -117,20 +110,14 @@ export class CheckUsersActionsAndMailingUsersUseCase implements ICheckUsersActio .where('user.isDemoAccount = :isDemoAccount', { isDemoAccount: false }) .andWhere('(user_action.mail_sent = :mail_sent OR user_action.id is null)', { mail_sent: false }) .andWhere('tableLogs.id is null'); - while (hasMore) { - const paginatedQuery = usersWithoutLogsQuery.skip(page * pageSize).take(pageSize); - const batch = await paginatedQuery.getRawMany(); - if (batch.length === 0) { - hasMore = false; - } else { - batch.forEach((user) => { - if (!uniqueUsersMap.has(user.id)) { - uniqueUsersMap.set(user.id, { id: user.id, email: user.email }); - } - }); - page++; + + const usersWithoutLogs = await usersWithoutLogsQuery.getRawMany(); + + usersWithoutLogs.forEach((user) => { + if (!uniqueUsersMap.has(user.id)) { + uniqueUsersMap.set(user.id, { id: user.id, email: user.email }); } - } + }); return Array.from(uniqueUsersMap.values()); } catch (error) { diff --git a/backend/src/entities/user-actions/use-cases/check-users-logs-and-update-actions.use.case.ts b/backend/src/entities/user-actions/use-cases/check-users-logs-and-update-actions.use.case.ts index 3b1281022..f6f764cf0 100644 --- a/backend/src/entities/user-actions/use-cases/check-users-logs-and-update-actions.use.case.ts +++ b/backend/src/entities/user-actions/use-cases/check-users-logs-and-update-actions.use.case.ts @@ -16,7 +16,6 @@ export class CheckUsersLogsAndUpdateActionsUseCase implements ICheckUsersLogsAnd private readonly tableLogsRepository: Repository, ) {} public async execute(): Promise { - console.info('Updating actions started'); const uniqueUserIds: Array = await this.findDistinctUserIdsWithNonFinishedActions(); const batchSize = 3; @@ -31,8 +30,8 @@ export class CheckUsersLogsAndUpdateActionsUseCase implements ICheckUsersLogsAnd } private async checkUserLogsAndUpdateAction(userId: string): Promise { - const successFullTableReceivingUserLogs = await this.findSuccessfulTableReceivingUserLogs(userId); - if (!successFullTableReceivingUserLogs || successFullTableReceivingUserLogs.length === 0) { + const hasSuccessfulTableReceivingLogs = await this.findSuccessfulTableReceivingUserLogs(userId); + if (!hasSuccessfulTableReceivingLogs) { return; } const foundUserAction = await this.findNonFinishedConnectionCreationUserAction(userId); @@ -43,16 +42,6 @@ export class CheckUsersLogsAndUpdateActionsUseCase implements ICheckUsersLogsAnd await this.userActionRepository.save(foundUserAction); } - private async findAllNonFinishedActionsTwoWeeksOld(): Promise> { - const notFinishedActionsQb = this.userActionRepository - .createQueryBuilder('user_action') - .leftJoinAndSelect('user_action.user', 'user') - .andWhere('user_action.createdAt <= :date_to', { date_to: Constants.ONE_WEEK_AGO() }) - .andWhere('user_action.mail_sent = :mail_sent', { mail_sent: false }) - .andWhere('user_action.message = :message', { message: UserActionEnum.CONNECTION_CREATION_NOT_FINISHED }); - return await notFinishedActionsQb.getMany(); - } - private async findDistinctUserIdsWithNonFinishedActions(): Promise> { const result = await this.userActionRepository .createQueryBuilder('user_action') @@ -66,24 +55,27 @@ export class CheckUsersLogsAndUpdateActionsUseCase implements ICheckUsersLogsAnd return result.map((item) => item.userId); } - private async findSuccessfulTableReceivingUserLogs(userId: string): Promise> { - const userLogsQB = this.tableLogsRepository + private async findSuccessfulTableReceivingUserLogs(userId: string): Promise { + const userLogsCount = await this.tableLogsRepository .createQueryBuilder('tableLogs') - .leftJoinAndSelect('tableLogs.connection_id', 'connection') - .leftJoinAndSelect('connection.groups', 'group') - .leftJoinAndSelect('group.users', 'user') - .andWhere('user.id = :user_id', { user_id: userId }) + .leftJoin('tableLogs.connection_id', 'connection') + .leftJoin('connection.groups', 'group') + .leftJoin('group.users', 'user') + .where('user.id = :user_id', { user_id: userId }) .andWhere('tableLogs.operationType = :operationType', { operationType: LogOperationTypeEnum.rowsReceived }) .andWhere('tableLogs.operationStatusResult = :operationStatusResult', { operationStatusResult: OperationResultStatusEnum.successfully, - }); - return await userLogsQB.getMany(); + }) + .select('1') + .limit(1) + .getRawOne(); + return !!userLogsCount; } private async findNonFinishedConnectionCreationUserAction(userId: string): Promise { const actionQb = this.userActionRepository .createQueryBuilder('user_action') - .leftJoinAndSelect('user_action.user', 'user') + .leftJoin('user_action.user', 'user') .andWhere('user.id = :user_id', { user_id: userId }) .andWhere('user_action.message = :message', { message: UserActionEnum.CONNECTION_CREATION_NOT_FINISHED }); return await actionQb.getOne(); diff --git a/backend/src/helpers/constants/constants.ts b/backend/src/helpers/constants/constants.ts index 91277a1f9..49824c0fd 100644 --- a/backend/src/helpers/constants/constants.ts +++ b/backend/src/helpers/constants/constants.ts @@ -65,6 +65,15 @@ export const Constants = { return today; }, + ONE_MONTH_AND_A_WEEK_AGO: (): Date => { + const today = new Date(); + const oneMonthAgo = today.getMonth() - 1; + const oneWeekAgo = today.getDate() - 7; + today.setMonth(oneMonthAgo); + today.setDate(oneWeekAgo); + return today; + }, + TWO_WEEKS_AGO: (): Date => { const currentDate = Date.now(); const twoWeeksInMs = 1209600000;