diff --git a/backend/src/entities/cron-jobs/cron-jobs.service.ts b/backend/src/entities/cron-jobs/cron-jobs.service.ts index 7428c4e44..6e078ada2 100644 --- a/backend/src/entities/cron-jobs/cron-jobs.service.ts +++ b/backend/src/entities/cron-jobs/cron-jobs.service.ts @@ -32,15 +32,9 @@ export class CronJobsService { if (!isJobAdded) { return; } - await slackPostMessage( - `midnight cron started at ${Constants.CURRENT_TIME_FORMATTED()}`, - Constants.EXCEPTIONS_CHANNELS, - ); + await slackPostMessage(`midnight cron started at ${this.getCurrentTime()}`, Constants.EXCEPTIONS_CHANNELS); await this.checkUsersLogsAndUpdateActionsUseCase.execute(); - await slackPostMessage( - `midnight cron finished at ${Constants.CURRENT_TIME_FORMATTED()}`, - Constants.EXCEPTIONS_CHANNELS, - ); + await slackPostMessage(`midnight cron finished at ${this.getCurrentTime()}`, Constants.EXCEPTIONS_CHANNELS); } catch (e) { Sentry.captureException(e); console.error(e); @@ -55,10 +49,7 @@ export class CronJobsService { return; } - await slackPostMessage( - `email cron started at ${Constants.CURRENT_TIME_FORMATTED()}`, - Constants.EXCEPTIONS_CHANNELS, - ); + await slackPostMessage(`email cron started at ${this.getCurrentTime()}`, Constants.EXCEPTIONS_CHANNELS); let emails = await this.checkUsersActionsAndMailingUsersUseCase.execute(); emails = emails.filter((email) => { @@ -73,17 +64,11 @@ export class CronJobsService { if (mailingResults.length === 0) { const mailingResultToString = 'Sending emails triggered, but no emails sent (no users found)'; await slackPostMessage(mailingResultToString, Constants.EXCEPTIONS_CHANNELS); - await slackPostMessage( - `morning cron finished at ${Constants.CURRENT_TIME_FORMATTED()}`, - Constants.EXCEPTIONS_CHANNELS, - ); + await slackPostMessage(`morning cron finished at ${this.getCurrentTime()}`, Constants.EXCEPTIONS_CHANNELS); } else { await slackPostMessage(JSON.stringify(mailingResults), Constants.EXCEPTIONS_CHANNELS); // await this.sendEmailResultsToSlack(mailingResults, emails); - await slackPostMessage( - `morning cron finished at ${Constants.CURRENT_TIME_FORMATTED()}`, - Constants.EXCEPTIONS_CHANNELS, - ); + await slackPostMessage(`morning cron finished at ${this.getCurrentTime()}`, Constants.EXCEPTIONS_CHANNELS); } } catch (e) { await slackPostMessage(`Error in morning cron: ${e.message}`, Constants.EXCEPTIONS_CHANNELS); @@ -94,7 +79,9 @@ export class CronJobsService { @Cron(CronExpression.EVERY_DAY_AT_NOON) public async clearJobList(): Promise { + await slackPostMessage(`clearing cron job list at ${this.getCurrentTime()}`, Constants.EXCEPTIONS_CHANNELS); await this.jobListRepository.clear(); + await slackPostMessage(`cron job list cleared at ${this.getCurrentTime()}`, Constants.EXCEPTIONS_CHANNELS); } private async insertMidnightJob(): Promise { @@ -159,4 +146,9 @@ export class CronJobsService { await slackPostMessage(fullTimedOutMessage, Constants.EXCEPTIONS_CHANNELS); } } + + private getCurrentTime(): string { + const now = new Date(); + return `${now.getHours()}:${now.getMinutes()}:${now.getSeconds()}`; + } } diff --git a/backend/src/entities/user-actions/use-cases/check-users-actions-and-mailing-users.use.case.ts b/backend/src/entities/user-actions/use-cases/check-users-actions-and-mailing-users.use.case.ts index 1add200e8..cc56dd613 100644 --- a/backend/src/entities/user-actions/use-cases/check-users-actions-and-mailing-users.use.case.ts +++ b/backend/src/entities/user-actions/use-cases/check-users-actions-and-mailing-users.use.case.ts @@ -20,24 +20,32 @@ export class CheckUsersActionsAndMailingUsersUseCase implements ICheckUsersActio ) {} public async execute(): Promise> { try { - const nonFinishedUsersActions = await this.findAllNonFinishedActionsTwoWeeksOld(); - const usersWithoutLogs = await this.findUsersWithoutLogs(); - const usersFromActions: Array = nonFinishedUsersActions.map( - (action: UserActionEntity) => action.user, - ); - const allUsersArray: Array = usersWithoutLogs.concat(usersFromActions); - const filteredUsers: Array<{ id: string; email: string }> = Array.from( - new Set(allUsersArray.map((u) => u.id)), - ).map((id) => { - return { - id: id, - email: allUsersArray.find((u) => u.id === id).email, - }; - }); + const distinctUsers = await this.findDistinctUsersForProcessing(); + const batchSize = 10; const queue = new PQueue({ concurrency: 3 }); - await Promise.all(filteredUsers.map((u) => queue.add(() => this.updateOrCreateActionForUser(u)))); - const userEmails = filteredUsers.map((u) => u.email?.toLowerCase()); - return getUniqArrayStrings(userEmails); + const emails: string[] = []; + + try { + for (let i = 0; i < distinctUsers.length; i += batchSize) { + const batch = distinctUsers.slice(i, i + batchSize); + await Promise.all( + batch.map((user) => + queue.add(async () => { + await this.updateOrCreateActionForUser(user); + if (user.email) { + emails.push(user.email.toLowerCase()); + } + }), + ), + ); + } + + await queue.onIdle(); + } finally { + queue.clear(); + } + + return getUniqArrayStrings(emails); } catch (e) { Sentry.captureException(e); console.error(e); @@ -60,37 +68,48 @@ export class CheckUsersActionsAndMailingUsersUseCase implements ICheckUsersActio await this.userActionRepository.save(newUserActionEntity); } - private async findAllNonFinishedActionsTwoWeeksOld(): Promise> { - const notFinishedActionsQb = this.userActionRepository + private async findUserActionWithoutSentMail(userId: string): Promise { + const actionQb = this.userActionRepository .createQueryBuilder('user_action') .leftJoinAndSelect('user_action.user', 'user') - .andWhere('user.isDemoAccount = :isDemoAccount', { isDemoAccount: false }) + .andWhere('user.id = :user_id', { user_id: userId }) + .andWhere('user_action.mail_sent = :mail_sent', { mail_sent: false }); + return await actionQb.getOne(); + } + + private async findDistinctUsersForProcessing(): Promise> { + const nonFinishedActionsQuery = this.userActionRepository + .createQueryBuilder('user_action') + .select(['user.id as id', 'user.email as email']) + .innerJoin('user_action.user', 'user') + .where('user.isDemoAccount = :isDemoAccount', { isDemoAccount: false }) .andWhere('user_action.createdAt <= :date_to', { date_to: Constants.ONE_WEEK_AGO() }) .andWhere('user_action.mail_sent = :mail_sent', { mail_sent: false }) .andWhere('user_action.message = :message', { message: UserActionEnum.CONNECTION_CREATION_NOT_FINISHED }); - return await notFinishedActionsQb.getMany(); - } - private async findUsersWithoutLogs(): Promise> { - const usersQb = this.userRepository + const usersWithoutLogsQuery = this.userRepository .createQueryBuilder('user') - .leftJoinAndSelect('user.groups', 'group') - .leftJoinAndSelect('group.connection', 'connection') - .leftJoinAndSelect('connection.logs', 'tableLogs') - .leftJoinAndSelect('user.user_action', 'user_action') - .andWhere('user.isDemoAccount = :isDemoAccount', { isDemoAccount: false }) - .andWhere('user_action.mail_sent = :mail_sent', { mail_sent: false }) - .orWhere('user_action.id is null') + .select(['user.id as id', 'user.email as email']) + .leftJoin('user.groups', 'group') + .leftJoin('group.connection', 'connection') + .leftJoin('connection.logs', 'tableLogs') + .leftJoin('user.user_action', 'user_action') + .where('user.isDemoAccount = :isDemoAccount', { isDemoAccount: false }) + .andWhere('(user_action.mail_sent = :mail_sent OR user_action.id is null)', { mail_sent: false }) .andWhere('tableLogs.id is null'); - return await usersQb.getMany(); - } - private async findUserActionWithoutSentMail(userId: string): Promise { - const actionQb = this.userActionRepository - .createQueryBuilder('user_action') - .leftJoinAndSelect('user_action.user', 'user') - .andWhere('user.id = :user_id', { user_id: userId }) - .andWhere('user_action.mail_sent = :mail_sent', { mail_sent: false }); - return await actionQb.getOne(); + const nonFinishedUsersResult = await nonFinishedActionsQuery.getRawMany(); + const usersWithoutLogsResult = await usersWithoutLogsQuery.getRawMany(); + + const combinedUsers = [...nonFinishedUsersResult, ...usersWithoutLogsResult]; + const uniqueUsersMap = new Map(); + + combinedUsers.forEach((user) => { + if (!uniqueUsersMap.has(user.id)) { + uniqueUsersMap.set(user.id, { id: user.id, email: user.email }); + } + }); + + return Array.from(uniqueUsersMap.values()); } } diff --git a/backend/src/entities/user-actions/use-cases/check-users-logs-and-update-actions.use.case.ts b/backend/src/entities/user-actions/use-cases/check-users-logs-and-update-actions.use.case.ts index c02272db2..c65a693c0 100644 --- a/backend/src/entities/user-actions/use-cases/check-users-logs-and-update-actions.use.case.ts +++ b/backend/src/entities/user-actions/use-cases/check-users-logs-and-update-actions.use.case.ts @@ -2,7 +2,6 @@ import { Injectable } from '@nestjs/common'; import { InjectRepository } from '@nestjs/typeorm'; import { Repository } from 'typeorm'; import { LogOperationTypeEnum, OperationResultStatusEnum, UserActionEnum } from '../../../enums/index.js'; -import { getUniqArrayStrings } from '../../../helpers/index.js'; import { Constants } from '../../../helpers/constants/constants.js'; import { TableLogsEntity } from '../../table-logs/table-logs.entity.js'; import { UserActionEntity } from '../user-action.entity.js'; @@ -18,14 +17,17 @@ export class CheckUsersLogsAndUpdateActionsUseCase implements ICheckUsersLogsAnd ) {} public async execute(): Promise { console.info('Updating actions started'); - const foundActions: Array = await this.findAllNonFinishedActionsTwoWeeksOld(); - const userIdsFromActions: Array = foundActions.map((action: UserActionEntity) => action.user.id); - const filteredIds: Array = getUniqArrayStrings(userIdsFromActions); - await Promise.allSettled( - filteredIds.map(async (id: string) => { - await this.checkUserLogsAndUpdateAction(id); - }), - ); + const uniqueUserIds: Array = await this.findDistinctUserIdsWithNonFinishedActions(); + + const batchSize = 10; + for (let i = 0; i < uniqueUserIds.length; i += batchSize) { + const batch = uniqueUserIds.slice(i, i + batchSize); + await Promise.allSettled( + batch.map(async (id: string) => { + await this.checkUserLogsAndUpdateAction(id); + }), + ); + } } private async checkUserLogsAndUpdateAction(userId: string): Promise { @@ -51,6 +53,19 @@ export class CheckUsersLogsAndUpdateActionsUseCase implements ICheckUsersLogsAnd return await notFinishedActionsQb.getMany(); } + private async findDistinctUserIdsWithNonFinishedActions(): Promise> { + const result = await this.userActionRepository + .createQueryBuilder('user_action') + .select('DISTINCT user.id', 'userId') + .leftJoin('user_action.user', 'user') + .where('user_action.createdAt <= :date_to', { date_to: Constants.ONE_WEEK_AGO() }) + .andWhere('user_action.mail_sent = :mail_sent', { mail_sent: false }) + .andWhere('user_action.message = :message', { message: UserActionEnum.CONNECTION_CREATION_NOT_FINISHED }) + .getRawMany(); + + return result.map((item) => item.userId); + } + private async findSuccessfulTableReceivingUserLogs(userId: string): Promise> { const userLogsQB = this.tableLogsRepository .createQueryBuilder('tableLogs') diff --git a/backend/src/helpers/constants/constants.ts b/backend/src/helpers/constants/constants.ts index 91277a1f9..06b69e42c 100644 --- a/backend/src/helpers/constants/constants.ts +++ b/backend/src/helpers/constants/constants.ts @@ -54,8 +54,9 @@ export const Constants = { CURRENT_TIME_FORMATTED: (): string => { const now = new Date(); - const padString = (n: number) => n.toString().padStart(2, '0'); - return `${padString(now.getHours())}:${padString(now.getMinutes())}:${padString(now.getSeconds())}`; + return now.toISOString(); + // const padString = (n: number) => n.toString().padStart(2, '0'); + // return `${padString(now.getHours())}:${padString(now.getMinutes())}:${padString(now.getSeconds())}`; }, ONE_WEEK_AGO: (): Date => {