Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 12 additions & 20 deletions backend/src/entities/cron-jobs/cron-jobs.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,15 +32,9 @@ export class CronJobsService {
if (!isJobAdded) {
return;
}
await slackPostMessage(
`midnight cron started at ${Constants.CURRENT_TIME_FORMATTED()}`,
Constants.EXCEPTIONS_CHANNELS,
);
await slackPostMessage(`midnight cron started at ${this.getCurrentTime()}`, Constants.EXCEPTIONS_CHANNELS);
await this.checkUsersLogsAndUpdateActionsUseCase.execute();
await slackPostMessage(
`midnight cron finished at ${Constants.CURRENT_TIME_FORMATTED()}`,
Constants.EXCEPTIONS_CHANNELS,
);
await slackPostMessage(`midnight cron finished at ${this.getCurrentTime()}`, Constants.EXCEPTIONS_CHANNELS);
} catch (e) {
Sentry.captureException(e);
console.error(e);
Expand All @@ -55,10 +49,7 @@ export class CronJobsService {
return;
}

await slackPostMessage(
`email cron started at ${Constants.CURRENT_TIME_FORMATTED()}`,
Constants.EXCEPTIONS_CHANNELS,
);
await slackPostMessage(`email cron started at ${this.getCurrentTime()}`, Constants.EXCEPTIONS_CHANNELS);

let emails = await this.checkUsersActionsAndMailingUsersUseCase.execute();
emails = emails.filter((email) => {
Expand All @@ -73,17 +64,11 @@ export class CronJobsService {
if (mailingResults.length === 0) {
const mailingResultToString = 'Sending emails triggered, but no emails sent (no users found)';
await slackPostMessage(mailingResultToString, Constants.EXCEPTIONS_CHANNELS);
await slackPostMessage(
`morning cron finished at ${Constants.CURRENT_TIME_FORMATTED()}`,
Constants.EXCEPTIONS_CHANNELS,
);
await slackPostMessage(`morning cron finished at ${this.getCurrentTime()}`, Constants.EXCEPTIONS_CHANNELS);
} else {
await slackPostMessage(JSON.stringify(mailingResults), Constants.EXCEPTIONS_CHANNELS);
// await this.sendEmailResultsToSlack(mailingResults, emails);
await slackPostMessage(
`morning cron finished at ${Constants.CURRENT_TIME_FORMATTED()}`,
Constants.EXCEPTIONS_CHANNELS,
);
await slackPostMessage(`morning cron finished at ${this.getCurrentTime()}`, Constants.EXCEPTIONS_CHANNELS);
}
} catch (e) {
await slackPostMessage(`Error in morning cron: ${e.message}`, Constants.EXCEPTIONS_CHANNELS);
Expand All @@ -94,7 +79,9 @@ export class CronJobsService {

@Cron(CronExpression.EVERY_DAY_AT_NOON)
public async clearJobList(): Promise<void> {
await slackPostMessage(`clearing cron job list at ${this.getCurrentTime()}`, Constants.EXCEPTIONS_CHANNELS);
await this.jobListRepository.clear();
await slackPostMessage(`cron job list cleared at ${this.getCurrentTime()}`, Constants.EXCEPTIONS_CHANNELS);
}

private async insertMidnightJob(): Promise<boolean> {
Expand Down Expand Up @@ -159,4 +146,9 @@ export class CronJobsService {
await slackPostMessage(fullTimedOutMessage, Constants.EXCEPTIONS_CHANNELS);
}
}

private getCurrentTime(): string {
const now = new Date();
return `${now.getHours()}:${now.getMinutes()}:${now.getSeconds()}`;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,24 +20,32 @@ export class CheckUsersActionsAndMailingUsersUseCase implements ICheckUsersActio
) {}
public async execute(): Promise<Array<string>> {
try {
const nonFinishedUsersActions = await this.findAllNonFinishedActionsTwoWeeksOld();
const usersWithoutLogs = await this.findUsersWithoutLogs();
const usersFromActions: Array<UserEntity> = nonFinishedUsersActions.map(
(action: UserActionEntity) => action.user,
);
const allUsersArray: Array<UserEntity> = usersWithoutLogs.concat(usersFromActions);
const filteredUsers: Array<{ id: string; email: string }> = Array.from(
new Set(allUsersArray.map((u) => u.id)),
).map((id) => {
return {
id: id,
email: allUsersArray.find((u) => u.id === id).email,
};
});
const distinctUsers = await this.findDistinctUsersForProcessing();
const batchSize = 10;
const queue = new PQueue({ concurrency: 3 });
await Promise.all(filteredUsers.map((u) => queue.add(() => this.updateOrCreateActionForUser(u))));
const userEmails = filteredUsers.map((u) => u.email?.toLowerCase());
return getUniqArrayStrings(userEmails);
const emails: string[] = [];

try {
for (let i = 0; i < distinctUsers.length; i += batchSize) {
const batch = distinctUsers.slice(i, i + batchSize);
await Promise.all(
batch.map((user) =>
queue.add(async () => {
await this.updateOrCreateActionForUser(user);
if (user.email) {
emails.push(user.email.toLowerCase());
}
}),
),
);
}

await queue.onIdle();
} finally {
queue.clear();
}

return getUniqArrayStrings(emails);
} catch (e) {
Sentry.captureException(e);
console.error(e);
Expand All @@ -60,37 +68,48 @@ export class CheckUsersActionsAndMailingUsersUseCase implements ICheckUsersActio
await this.userActionRepository.save(newUserActionEntity);
}

private async findAllNonFinishedActionsTwoWeeksOld(): Promise<Array<UserActionEntity>> {
const notFinishedActionsQb = this.userActionRepository
private async findUserActionWithoutSentMail(userId: string): Promise<UserActionEntity> {
const actionQb = this.userActionRepository
.createQueryBuilder('user_action')
.leftJoinAndSelect('user_action.user', 'user')
.andWhere('user.isDemoAccount = :isDemoAccount', { isDemoAccount: false })
.andWhere('user.id = :user_id', { user_id: userId })
.andWhere('user_action.mail_sent = :mail_sent', { mail_sent: false });
return await actionQb.getOne();
}

private async findDistinctUsersForProcessing(): Promise<Array<{ id: string; email: string }>> {
const nonFinishedActionsQuery = this.userActionRepository
.createQueryBuilder('user_action')
.select(['user.id as id', 'user.email as email'])
.innerJoin('user_action.user', 'user')
.where('user.isDemoAccount = :isDemoAccount', { isDemoAccount: false })
.andWhere('user_action.createdAt <= :date_to', { date_to: Constants.ONE_WEEK_AGO() })
.andWhere('user_action.mail_sent = :mail_sent', { mail_sent: false })
.andWhere('user_action.message = :message', { message: UserActionEnum.CONNECTION_CREATION_NOT_FINISHED });
return await notFinishedActionsQb.getMany();
}

private async findUsersWithoutLogs(): Promise<Array<UserEntity>> {
const usersQb = this.userRepository
const usersWithoutLogsQuery = this.userRepository
.createQueryBuilder('user')
.leftJoinAndSelect('user.groups', 'group')
.leftJoinAndSelect('group.connection', 'connection')
.leftJoinAndSelect('connection.logs', 'tableLogs')
.leftJoinAndSelect('user.user_action', 'user_action')
.andWhere('user.isDemoAccount = :isDemoAccount', { isDemoAccount: false })
.andWhere('user_action.mail_sent = :mail_sent', { mail_sent: false })
.orWhere('user_action.id is null')
.select(['user.id as id', 'user.email as email'])
.leftJoin('user.groups', 'group')
.leftJoin('group.connection', 'connection')
.leftJoin('connection.logs', 'tableLogs')
.leftJoin('user.user_action', 'user_action')
.where('user.isDemoAccount = :isDemoAccount', { isDemoAccount: false })
.andWhere('(user_action.mail_sent = :mail_sent OR user_action.id is null)', { mail_sent: false })
.andWhere('tableLogs.id is null');
return await usersQb.getMany();
}

private async findUserActionWithoutSentMail(userId: string): Promise<UserActionEntity> {
const actionQb = this.userActionRepository
.createQueryBuilder('user_action')
.leftJoinAndSelect('user_action.user', 'user')
.andWhere('user.id = :user_id', { user_id: userId })
.andWhere('user_action.mail_sent = :mail_sent', { mail_sent: false });
return await actionQb.getOne();
const nonFinishedUsersResult = await nonFinishedActionsQuery.getRawMany();
const usersWithoutLogsResult = await usersWithoutLogsQuery.getRawMany();

const combinedUsers = [...nonFinishedUsersResult, ...usersWithoutLogsResult];
const uniqueUsersMap = new Map<string, { id: string; email: string }>();

combinedUsers.forEach((user) => {
if (!uniqueUsersMap.has(user.id)) {
uniqueUsersMap.set(user.id, { id: user.id, email: user.email });
}
});

return Array.from(uniqueUsersMap.values());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ import { Injectable } from '@nestjs/common';
import { InjectRepository } from '@nestjs/typeorm';
import { Repository } from 'typeorm';
import { LogOperationTypeEnum, OperationResultStatusEnum, UserActionEnum } from '../../../enums/index.js';
import { getUniqArrayStrings } from '../../../helpers/index.js';
import { Constants } from '../../../helpers/constants/constants.js';
import { TableLogsEntity } from '../../table-logs/table-logs.entity.js';
import { UserActionEntity } from '../user-action.entity.js';
Expand All @@ -18,14 +17,17 @@ export class CheckUsersLogsAndUpdateActionsUseCase implements ICheckUsersLogsAnd
) {}
public async execute(): Promise<void> {
console.info('Updating actions started');
const foundActions: Array<UserActionEntity> = await this.findAllNonFinishedActionsTwoWeeksOld();
const userIdsFromActions: Array<string> = foundActions.map((action: UserActionEntity) => action.user.id);
const filteredIds: Array<string> = getUniqArrayStrings(userIdsFromActions);
await Promise.allSettled(
filteredIds.map(async (id: string) => {
await this.checkUserLogsAndUpdateAction(id);
}),
);
const uniqueUserIds: Array<string> = await this.findDistinctUserIdsWithNonFinishedActions();

const batchSize = 10;
for (let i = 0; i < uniqueUserIds.length; i += batchSize) {
const batch = uniqueUserIds.slice(i, i + batchSize);
await Promise.allSettled(
batch.map(async (id: string) => {
await this.checkUserLogsAndUpdateAction(id);
}),
);
}
}

private async checkUserLogsAndUpdateAction(userId: string): Promise<void> {
Expand All @@ -51,6 +53,19 @@ export class CheckUsersLogsAndUpdateActionsUseCase implements ICheckUsersLogsAnd
return await notFinishedActionsQb.getMany();
}

private async findDistinctUserIdsWithNonFinishedActions(): Promise<Array<string>> {
const result = await this.userActionRepository
.createQueryBuilder('user_action')
.select('DISTINCT user.id', 'userId')
.leftJoin('user_action.user', 'user')
.where('user_action.createdAt <= :date_to', { date_to: Constants.ONE_WEEK_AGO() })
.andWhere('user_action.mail_sent = :mail_sent', { mail_sent: false })
.andWhere('user_action.message = :message', { message: UserActionEnum.CONNECTION_CREATION_NOT_FINISHED })
.getRawMany();

return result.map((item) => item.userId);
}

private async findSuccessfulTableReceivingUserLogs(userId: string): Promise<Array<TableLogsEntity>> {
const userLogsQB = this.tableLogsRepository
.createQueryBuilder('tableLogs')
Expand Down
5 changes: 3 additions & 2 deletions backend/src/helpers/constants/constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,9 @@ export const Constants = {

CURRENT_TIME_FORMATTED: (): string => {
const now = new Date();
const padString = (n: number) => n.toString().padStart(2, '0');
return `${padString(now.getHours())}:${padString(now.getMinutes())}:${padString(now.getSeconds())}`;
return now.toISOString();
// const padString = (n: number) => n.toString().padStart(2, '0');
// return `${padString(now.getHours())}:${padString(now.getMinutes())}:${padString(now.getSeconds())}`;
},

ONE_WEEK_AGO: (): Date => {
Expand Down