Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 7 additions & 14 deletions backend/src/entities/cron-jobs/cron-jobs.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -50,38 +50,34 @@ export class CronJobsService {
return;
}

await slackPostMessage(`email cron started at ${this.getCurrentTime()}`, Constants.EXCEPTIONS_CHANNELS);
await slackPostMessage(`Email cron started at ${this.getCurrentTime()}`, Constants.EXCEPTIONS_CHANNELS);

try {
let emails = await this.checkUsersActionsAndMailingUsersUseCase.execute();
console.log(`Retrieved ${emails.length} email addresses from use case`);
await slackPostMessage(
`Successfully retrieved ${emails.length} email addresses from use case`,
`Retrieved ${emails.length} email addresses from database`,
Constants.EXCEPTIONS_CHANNELS,
);

const emailsBefore = emails.length;
emails = emails.filter((email) => {
return ValidationHelper.isValidEmail(email);
});
console.log(`Filtered out ${emailsBefore - emails.length} invalid or demo emails`);

const filteredOutEmailsCount = emailsBefore - emails.length;

await slackPostMessage(
`Found ${emails.length} valid emails. starting messaging`,
`Found ${emails.length} valid emails${filteredOutEmailsCount ? `. Filtered out ${filteredOutEmailsCount} invalid or demo emails` : ``}. Starting messaging`,
Constants.EXCEPTIONS_CHANNELS,
);

const batchSize = 10;
let allMailingResults = [];

for (let i = 0; i < emails.length; i += batchSize) {
const emailsBatch = emails.slice(i, i + batchSize);
console.log(
`Processing email batch ${Math.floor(i / batchSize) + 1} of ${Math.ceil(emails.length / batchSize)}, with ${emailsBatch.length} emails`,
);

try {
const batchResults = await this.emailService.sendRemindersToUsers(emailsBatch);
console.log(`Batch ${Math.floor(i / batchSize) + 1} completed with ${batchResults.length} results`);
allMailingResults = [...allMailingResults, ...batchResults];
} catch (error) {
console.error(`Error processing batch ${Math.floor(i / batchSize) + 1}: ${error.message}`);
Expand All @@ -90,15 +86,12 @@ export class CronJobsService {
await new Promise((resolve) => setTimeout(resolve, 1000));
}

console.log(`Email sending completed. Total results: ${allMailingResults.length}`);

if (allMailingResults.length === 0) {
const mailingResultToString = 'Sending emails triggered, but no emails sent (no users found)';
await slackPostMessage(mailingResultToString, Constants.EXCEPTIONS_CHANNELS);
await slackPostMessage(`morning cron finished at ${this.getCurrentTime()}`, Constants.EXCEPTIONS_CHANNELS);
} else {
await slackPostMessage(`Sent ${allMailingResults.length} emails successfully`, Constants.EXCEPTIONS_CHANNELS);
// await this.sendEmailResultsToSlack(allMailingResults, emails);
await this.sendEmailResultsToSlack(allMailingResults, emails);
await slackPostMessage(`morning cron finished at ${this.getCurrentTime()}`, Constants.EXCEPTIONS_CHANNELS);
}
} catch (innerError) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ export class CheckUsersActionsAndMailingUsersUseCase implements ICheckUsersActio
const distinctUsers = await this.findDistinctUsersForProcessing();
const batchSize = 10;
const emails: string[] = [];
const queue = new PQueue({ concurrency: 2 });
const queue = new PQueue({ concurrency: 5 });
for (let i = 0; i < distinctUsers.length; i += batchSize) {
const batch = distinctUsers.slice(i, i + batchSize);
for (const user of batch) {
Expand Down Expand Up @@ -81,56 +81,43 @@ export class CheckUsersActionsAndMailingUsersUseCase implements ICheckUsersActio
try {
const nonFinishedActionsQuery = this.userActionRepository
.createQueryBuilder('user_action')
.select(['user.id as id', 'user.email as email'])
.select('DISTINCT user.id as id')
.addSelect('user.email as email')
.innerJoin('user_action.user', 'user')
.where('user.isDemoAccount = :isDemoAccount', { isDemoAccount: false })
.andWhere('user_action.createdAt <= :date_to', { date_to: Constants.ONE_WEEK_AGO() })
.andWhere('user_action.mail_sent = :mail_sent', { mail_sent: false })
.andWhere('user_action.message = :message', { message: UserActionEnum.CONNECTION_CREATION_NOT_FINISHED });
const pageSize = 100;
let page = 0;
let hasMore = true;

const uniqueUsersMap = new Map<string, { id: string; email: string }>();
while (hasMore) {
const paginatedQuery = nonFinishedActionsQuery.skip(page * pageSize).take(pageSize);
const batch = await paginatedQuery.getRawMany();
if (batch.length === 0) {
hasMore = false;
} else {
batch.forEach((user) => {
if (!uniqueUsersMap.has(user.id)) {
uniqueUsersMap.set(user.id, { id: user.id, email: user.email });
}
});
page++;

const nonFinishedUsers = await nonFinishedActionsQuery.getRawMany();

nonFinishedUsers.forEach((user) => {
if (!uniqueUsersMap.has(user.id)) {
uniqueUsersMap.set(user.id, { id: user.id, email: user.email });
}
}
page = 0;
hasMore = true;
});

const usersWithoutLogsQuery = this.userRepository
.createQueryBuilder('user')
.select(['user.id as id', 'user.email as email'])
.select('DISTINCT user.id as id')
.addSelect('user.email as email')
.leftJoin('user.groups', 'group')
.leftJoin('group.connection', 'connection')
.leftJoin('connection.logs', 'tableLogs')
.leftJoin('user.user_action', 'user_action')
.where('user.isDemoAccount = :isDemoAccount', { isDemoAccount: false })
.andWhere('(user_action.mail_sent = :mail_sent OR user_action.id is null)', { mail_sent: false })
.andWhere('tableLogs.id is null');
while (hasMore) {
const paginatedQuery = usersWithoutLogsQuery.skip(page * pageSize).take(pageSize);
const batch = await paginatedQuery.getRawMany();
if (batch.length === 0) {
hasMore = false;
} else {
batch.forEach((user) => {
if (!uniqueUsersMap.has(user.id)) {
uniqueUsersMap.set(user.id, { id: user.id, email: user.email });
}
});
page++;

const usersWithoutLogs = await usersWithoutLogsQuery.getRawMany();

usersWithoutLogs.forEach((user) => {
if (!uniqueUsersMap.has(user.id)) {
uniqueUsersMap.set(user.id, { id: user.id, email: user.email });
}
}
});

return Array.from(uniqueUsersMap.values());
} catch (error) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ export class CheckUsersLogsAndUpdateActionsUseCase implements ICheckUsersLogsAnd
private readonly tableLogsRepository: Repository<TableLogsEntity>,
) {}
public async execute(): Promise<void> {
console.info('Updating actions started');
const uniqueUserIds: Array<string> = await this.findDistinctUserIdsWithNonFinishedActions();

const batchSize = 3;
Expand All @@ -31,8 +30,8 @@ export class CheckUsersLogsAndUpdateActionsUseCase implements ICheckUsersLogsAnd
}

private async checkUserLogsAndUpdateAction(userId: string): Promise<void> {
const successFullTableReceivingUserLogs = await this.findSuccessfulTableReceivingUserLogs(userId);
if (!successFullTableReceivingUserLogs || successFullTableReceivingUserLogs.length === 0) {
const hasSuccessfulTableReceivingLogs = await this.findSuccessfulTableReceivingUserLogs(userId);
if (!hasSuccessfulTableReceivingLogs) {
return;
}
const foundUserAction = await this.findNonFinishedConnectionCreationUserAction(userId);
Expand All @@ -43,16 +42,6 @@ export class CheckUsersLogsAndUpdateActionsUseCase implements ICheckUsersLogsAnd
await this.userActionRepository.save(foundUserAction);
}

private async findAllNonFinishedActionsTwoWeeksOld(): Promise<Array<UserActionEntity>> {
const notFinishedActionsQb = this.userActionRepository
.createQueryBuilder('user_action')
.leftJoinAndSelect('user_action.user', 'user')
.andWhere('user_action.createdAt <= :date_to', { date_to: Constants.ONE_WEEK_AGO() })
.andWhere('user_action.mail_sent = :mail_sent', { mail_sent: false })
.andWhere('user_action.message = :message', { message: UserActionEnum.CONNECTION_CREATION_NOT_FINISHED });
return await notFinishedActionsQb.getMany();
}

private async findDistinctUserIdsWithNonFinishedActions(): Promise<Array<string>> {
const result = await this.userActionRepository
.createQueryBuilder('user_action')
Expand All @@ -66,24 +55,27 @@ export class CheckUsersLogsAndUpdateActionsUseCase implements ICheckUsersLogsAnd
return result.map((item) => item.userId);
}

private async findSuccessfulTableReceivingUserLogs(userId: string): Promise<Array<TableLogsEntity>> {
const userLogsQB = this.tableLogsRepository
private async findSuccessfulTableReceivingUserLogs(userId: string): Promise<boolean> {
const userLogsCount = await this.tableLogsRepository
.createQueryBuilder('tableLogs')
.leftJoinAndSelect('tableLogs.connection_id', 'connection')
.leftJoinAndSelect('connection.groups', 'group')
.leftJoinAndSelect('group.users', 'user')
.andWhere('user.id = :user_id', { user_id: userId })
.leftJoin('tableLogs.connection_id', 'connection')
.leftJoin('connection.groups', 'group')
.leftJoin('group.users', 'user')
.where('user.id = :user_id', { user_id: userId })
.andWhere('tableLogs.operationType = :operationType', { operationType: LogOperationTypeEnum.rowsReceived })
.andWhere('tableLogs.operationStatusResult = :operationStatusResult', {
operationStatusResult: OperationResultStatusEnum.successfully,
});
return await userLogsQB.getMany();
})
.select('1')
.limit(1)
.getRawOne();
return !!userLogsCount;
}

private async findNonFinishedConnectionCreationUserAction(userId: string): Promise<UserActionEntity> {
const actionQb = this.userActionRepository
.createQueryBuilder('user_action')
.leftJoinAndSelect('user_action.user', 'user')
.leftJoin('user_action.user', 'user')
.andWhere('user.id = :user_id', { user_id: userId })
.andWhere('user_action.message = :message', { message: UserActionEnum.CONNECTION_CREATION_NOT_FINISHED });
return await actionQb.getOne();
Expand Down
9 changes: 9 additions & 0 deletions backend/src/helpers/constants/constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,15 @@ export const Constants = {
return today;
},

ONE_MONTH_AND_A_WEEK_AGO: (): Date => {
const today = new Date();
const oneMonthAgo = today.getMonth() - 1;
const oneWeekAgo = today.getDate() - 7;
today.setMonth(oneMonthAgo);
today.setDate(oneWeekAgo);
return today;
},

TWO_WEEKS_AGO: (): Date => {
const currentDate = Date.now();
const twoWeeksInMs = 1209600000;
Expand Down
Loading