Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
110 changes: 86 additions & 24 deletions backend/src/entities/cron-jobs/cron-jobs.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -46,34 +46,76 @@ export class CronJobsService {
try {
const isJobAdded = await this.insertMorningJob();
if (!isJobAdded) {
console.log('Job already in progress, exiting');
return;
}

await slackPostMessage(`email cron started at ${this.getCurrentTime()}`, Constants.EXCEPTIONS_CHANNELS);

let emails = await this.checkUsersActionsAndMailingUsersUseCase.execute();
emails = emails.filter((email) => {
if (email && /^demo_.*@rocketadmin\.com$/i.test(email)) {
return false;
try {
let emails = await this.checkUsersActionsAndMailingUsersUseCase.execute();
console.log(`Retrieved ${emails.length} email addresses from use case`);
await slackPostMessage(
`Successfully retrieved ${emails.length} email addresses from use case`,
Constants.EXCEPTIONS_CHANNELS,
);

const emailsBefore = emails.length;
emails = emails.filter((email) => {
return ValidationHelper.isValidEmail(email);
});
console.log(`Filtered out ${emailsBefore - emails.length} invalid or demo emails`);

await slackPostMessage(
`Found ${emails.length} valid emails. starting messaging`,
Constants.EXCEPTIONS_CHANNELS,
);
const batchSize = 10;
let allMailingResults = [];

for (let i = 0; i < emails.length; i += batchSize) {
const emailsBatch = emails.slice(i, i + batchSize);
console.log(
`Processing email batch ${Math.floor(i / batchSize) + 1} of ${Math.ceil(emails.length / batchSize)}, with ${emailsBatch.length} emails`,
);

try {
const batchResults = await this.emailService.sendRemindersToUsers(emailsBatch);
console.log(`Batch ${Math.floor(i / batchSize) + 1} completed with ${batchResults.length} results`);
allMailingResults = [...allMailingResults, ...batchResults];
} catch (error) {
console.error(`Error processing batch ${Math.floor(i / batchSize) + 1}: ${error.message}`);
Sentry.captureException(error);
}
await new Promise((resolve) => setTimeout(resolve, 1000));
}
return ValidationHelper.isValidEmail(email);
});

await slackPostMessage(`found ${emails.length} valid emails. starting messaging`, Constants.EXCEPTIONS_CHANNELS);
const mailingResults = await this.emailService.sendRemindersToUsers(emails);
if (mailingResults.length === 0) {
const mailingResultToString = 'Sending emails triggered, but no emails sent (no users found)';
await slackPostMessage(mailingResultToString, Constants.EXCEPTIONS_CHANNELS);
await slackPostMessage(`morning cron finished at ${this.getCurrentTime()}`, Constants.EXCEPTIONS_CHANNELS);
} else {
await slackPostMessage(JSON.stringify(mailingResults), Constants.EXCEPTIONS_CHANNELS);
// await this.sendEmailResultsToSlack(mailingResults, emails);
await slackPostMessage(`morning cron finished at ${this.getCurrentTime()}`, Constants.EXCEPTIONS_CHANNELS);
console.log(`Email sending completed. Total results: ${allMailingResults.length}`);

if (allMailingResults.length === 0) {
const mailingResultToString = 'Sending emails triggered, but no emails sent (no users found)';
await slackPostMessage(mailingResultToString, Constants.EXCEPTIONS_CHANNELS);
await slackPostMessage(`morning cron finished at ${this.getCurrentTime()}`, Constants.EXCEPTIONS_CHANNELS);
} else {
await slackPostMessage(`Sent ${allMailingResults.length} emails successfully`, Constants.EXCEPTIONS_CHANNELS);
// await this.sendEmailResultsToSlack(allMailingResults, emails);
await slackPostMessage(`morning cron finished at ${this.getCurrentTime()}`, Constants.EXCEPTIONS_CHANNELS);
}
} catch (innerError) {
console.error('Detailed error in email processing:', innerError);
const errorMessage = innerError.stack
? `${innerError.message}\n${innerError.stack.split('\n').slice(0, 5).join('\n')}`
: innerError.message;
await slackPostMessage(`Error in email processing: ${errorMessage}`, Constants.EXCEPTIONS_CHANNELS);
Sentry.captureException(innerError);
}
} catch (e) {
await slackPostMessage(`Error in morning cron: ${e.message}`, Constants.EXCEPTIONS_CHANNELS);
console.error('Main cron handler error:', e);
const errorMessage = e.stack ? `${e.message}\n${e.stack.split('\n').slice(0, 5).join('\n')}` : e.message;
await slackPostMessage(`Error in morning cron: ${errorMessage}`, Constants.EXCEPTIONS_CHANNELS);
Sentry.captureException(e);
console.error(e);
} finally {
console.log('Cron job completed at', new Date().toISOString());
}
}

Expand Down Expand Up @@ -126,22 +168,42 @@ export class CronJobsService {
const filteredResults = results.filter((result) => !!result);
const nullResultsCount = results.length - filteredResults.length;
const chunkSize = 20;
const emailsNonFoundInResults = allFoundEmails.filter(
(email) => !filteredResults.some((result) => result?.accepted?.includes(email)),
);

const foundEmails = new Set();
filteredResults.forEach((result) => {
if (result?.accepted) {
result.accepted.forEach((email) => foundEmails.add(email));
}
});

const emailsNonFoundInResults = allFoundEmails.filter((email) => !foundEmails.has(email));

for (let i = 0; i < filteredResults.length; i += chunkSize) {
const chunk = filteredResults.slice(i, i + chunkSize);
const message = this.emailCronResultToSlackString(chunk);
if (!message) {
continue;
}
await slackPostMessage(message, Constants.EXCEPTIONS_CHANNELS);
await new Promise((resolve) => setTimeout(resolve, 100));
}

if (nullResultsCount > 0) {
const timedOutMessage = `The system timed out while sending results to ${nullResultsCount} email addresses`;
const timedOutEmailsMessage = `: \n${emailsNonFoundInResults.join(', ')}\n`;
const fullTimedOutMessage = `${timedOutMessage}${timedOutEmailsMessage}`;
await slackPostMessage(fullTimedOutMessage, Constants.EXCEPTIONS_CHANNELS);
if (emailsNonFoundInResults.length > 100) {
await slackPostMessage(timedOutMessage, Constants.EXCEPTIONS_CHANNELS);
for (let i = 0; i < emailsNonFoundInResults.length; i += 100) {
const emailsChunk = emailsNonFoundInResults.slice(i, i + 100);
await slackPostMessage(
`Failed emails (chunk ${i / 100 + 1}): ${emailsChunk.join(', ')}`,
Constants.EXCEPTIONS_CHANNELS,
);
}
} else {
const timedOutEmailsMessage = `: \n${emailsNonFoundInResults.join(', ')}\n`;
const fullTimedOutMessage = `${timedOutMessage}${timedOutEmailsMessage}`;
await slackPostMessage(fullTimedOutMessage, Constants.EXCEPTIONS_CHANNELS);
}
}
}

Expand Down
23 changes: 17 additions & 6 deletions backend/src/entities/email/email/email.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -80,14 +80,25 @@ export class EmailService {
}

public async sendRemindersToUsers(userEmails: Array<string>): Promise<Array<ICronMessagingResults | null>> {
const queue = new PQueue({ concurrency: 8 });
const mailingResults: Array<SMTPTransport.SentMessageInfo | void> = await Promise.all(
userEmails.map(async (email: string) => {
return await queue.add(async () => {
const queue = new PQueue({ concurrency: 3 });

const mailingResults: Array<SMTPTransport.SentMessageInfo | void> = [];

for (const email of userEmails) {
try {
const result = await queue.add(async () => {
return await this.sendReminderToUser(email);
});
}),
);
mailingResults.push(result);
} catch (error) {
this.logger.error(`Failed to send reminder to ${email}: ${error.message}`);
Sentry.captureException(error);
mailingResults.push(null);
}
}

await queue.onIdle();

return this.buildMailingResults(mailingResults);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,34 +21,34 @@ export class CheckUsersActionsAndMailingUsersUseCase implements ICheckUsersActio
public async execute(): Promise<Array<string>> {
try {
const distinctUsers = await this.findDistinctUsersForProcessing();
const batchSize = 2;
const queue = new PQueue({ concurrency: 3 });
const batchSize = 10;
const emails: string[] = [];

try {
for (let i = 0; i < distinctUsers.length; i += batchSize) {
const batch = distinctUsers.slice(i, i + batchSize);
await Promise.all(
batch.map((user) =>
queue.add(async () => {
await this.updateOrCreateActionForUser(user);
if (user.email) {
emails.push(user.email.toLowerCase());
}
}),
),
);
const queue = new PQueue({ concurrency: 2 });
for (let i = 0; i < distinctUsers.length; i += batchSize) {
const batch = distinctUsers.slice(i, i + batchSize);
for (const user of batch) {
try {
await queue.add(async () => {
await this.updateOrCreateActionForUser(user);
if (user.email) {
emails.push(user.email.toLowerCase());
}
});
} catch (error) {
console.error(`Error processing user ${user.id}: ${error.message}`);
Sentry.captureException(error);
}
}

await queue.onIdle();
} finally {
queue.clear();
if (i + batchSize < distinctUsers.length) {
await new Promise((resolve) => setTimeout(resolve, 500));
}
}

queue.clear();
return getUniqArrayStrings(emails);
} catch (e) {
console.error('Error in CheckUsersActionsAndMailingUsersUseCase.execute():', e);
Sentry.captureException(e);
console.error(e);
return [];
}
}
Expand Down Expand Up @@ -78,38 +78,65 @@ export class CheckUsersActionsAndMailingUsersUseCase implements ICheckUsersActio
}

private async findDistinctUsersForProcessing(): Promise<Array<{ id: string; email: string }>> {
const nonFinishedActionsQuery = this.userActionRepository
.createQueryBuilder('user_action')
.select(['user.id as id', 'user.email as email'])
.innerJoin('user_action.user', 'user')
.where('user.isDemoAccount = :isDemoAccount', { isDemoAccount: false })
.andWhere('user_action.createdAt <= :date_to', { date_to: Constants.ONE_WEEK_AGO() })
.andWhere('user_action.mail_sent = :mail_sent', { mail_sent: false })
.andWhere('user_action.message = :message', { message: UserActionEnum.CONNECTION_CREATION_NOT_FINISHED });

const usersWithoutLogsQuery = this.userRepository
.createQueryBuilder('user')
.select(['user.id as id', 'user.email as email'])
.leftJoin('user.groups', 'group')
.leftJoin('group.connection', 'connection')
.leftJoin('connection.logs', 'tableLogs')
.leftJoin('user.user_action', 'user_action')
.where('user.isDemoAccount = :isDemoAccount', { isDemoAccount: false })
.andWhere('(user_action.mail_sent = :mail_sent OR user_action.id is null)', { mail_sent: false })
.andWhere('tableLogs.id is null');

const nonFinishedUsersResult = await nonFinishedActionsQuery.getRawMany();
const usersWithoutLogsResult = await usersWithoutLogsQuery.getRawMany();

const combinedUsers = [...nonFinishedUsersResult, ...usersWithoutLogsResult];
const uniqueUsersMap = new Map<string, { id: string; email: string }>();

combinedUsers.forEach((user) => {
if (!uniqueUsersMap.has(user.id)) {
uniqueUsersMap.set(user.id, { id: user.id, email: user.email });
try {
const nonFinishedActionsQuery = this.userActionRepository
.createQueryBuilder('user_action')
.select(['user.id as id', 'user.email as email'])
.innerJoin('user_action.user', 'user')
.where('user.isDemoAccount = :isDemoAccount', { isDemoAccount: false })
.andWhere('user_action.createdAt <= :date_to', { date_to: Constants.ONE_WEEK_AGO() })
.andWhere('user_action.mail_sent = :mail_sent', { mail_sent: false })
.andWhere('user_action.message = :message', { message: UserActionEnum.CONNECTION_CREATION_NOT_FINISHED });
const pageSize = 100;
let page = 0;
let hasMore = true;
const uniqueUsersMap = new Map<string, { id: string; email: string }>();
while (hasMore) {
const paginatedQuery = nonFinishedActionsQuery.skip(page * pageSize).take(pageSize);
const batch = await paginatedQuery.getRawMany();
if (batch.length === 0) {
hasMore = false;
} else {
batch.forEach((user) => {
if (!uniqueUsersMap.has(user.id)) {
uniqueUsersMap.set(user.id, { id: user.id, email: user.email });
}
});
page++;
}
}
page = 0;
hasMore = true;
const usersWithoutLogsQuery = this.userRepository
.createQueryBuilder('user')
.select(['user.id as id', 'user.email as email'])
.leftJoin('user.groups', 'group')
.leftJoin('group.connection', 'connection')
.leftJoin('connection.logs', 'tableLogs')
.leftJoin('user.user_action', 'user_action')
.where('user.isDemoAccount = :isDemoAccount', { isDemoAccount: false })
.andWhere('(user_action.mail_sent = :mail_sent OR user_action.id is null)', { mail_sent: false })
.andWhere('tableLogs.id is null');
while (hasMore) {
const paginatedQuery = usersWithoutLogsQuery.skip(page * pageSize).take(pageSize);
const batch = await paginatedQuery.getRawMany();
if (batch.length === 0) {
hasMore = false;
} else {
batch.forEach((user) => {
if (!uniqueUsersMap.has(user.id)) {
uniqueUsersMap.set(user.id, { id: user.id, email: user.email });
}
});
page++;
}
}
});

return Array.from(uniqueUsersMap.values());
return Array.from(uniqueUsersMap.values());
} catch (error) {
console.error('Error in findDistinctUsersForProcessing:', error);
Sentry.captureException(error);
return [];
}
}
}
Loading