Skip to content

Commit b154e6f

Browse files
author
Shiva Kayathi
committed
WIP APPS-26 -- x-notify add bull queue and Jobs
1 parent 14fbca8 commit b154e6f

11 files changed

Lines changed: 447 additions & 15 deletions

File tree

controllers/mailing.js

Lines changed: 23 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,10 @@ const dbConn = module.parent.parent.exports.dbConn;
1111
const ObjectId = require('mongodb').ObjectId;
1212

1313
const { Worker } = require('worker_threads');
14+
const { createJob } = require("../jobs/bullConfig");
15+
16+
console.log("mailing " + typeof createJob)
17+
console.log(createJob)
1418

1519
const _mailingState = {
1620
cancelled: "cancelled",
@@ -395,8 +399,18 @@ async function mailingUpdate( mailingId, newHistoryState, options ) {
395399
async function sendMailingToSubs ( mailingId, topicId, mailingSubject, mailingBody ) {
396400

397401
// When completed, change state to "sent"
398-
402+
399403
// Start the worker.
404+
workerData = {
405+
topicId: topicId,
406+
mailingBody: mailingBody,
407+
mailingSubject: mailingSubject,
408+
typeMailing: "msgUpdates",
409+
sentTo: "allSubs",
410+
dbConn: true //dbConn
411+
};
412+
createJob("getSubscribers", workerData);
413+
/*
400414
const worker = new Worker( './controllers/workerSendEmail.js', {
401415
workerData: {
402416
topicId: topicId,
@@ -408,6 +422,11 @@ async function sendMailingToSubs ( mailingId, topicId, mailingSubject, mailingBo
408422
}
409423
});
410424
425+
426+
427+
428+
429+
411430
worker.on('message', function(msg){
412431
413432
if ( msg.completed ) {
@@ -419,12 +438,14 @@ async function sendMailingToSubs ( mailingId, topicId, mailingSubject, mailingBo
419438
}
420439
421440
console.log( msg.msg );
441+
422442
});
423443
424444
worker.on('error', function(msg){
425445
console.log( "Send to subs - Worker ERRROR: " + msg );
426446
});
427-
447+
*/
448+
mailingUpdate( mailingId, _mailingState.sent, { historyState: _mailingState.sending } );
428449
}
429450

430451
// Simple worker to send mailing

controllers/mailing_view.js

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,8 @@
1010
const mustache = require('mustache');
1111
const fsPromises = require('fs').promises;
1212
const mailing = require('./mailing');
13+
//const subscription = require('./subscriptions');
14+
1315
const _mailingState = mailing.mailingState;
1416
const _baseRedirFolder = ( process.env.baseFolder || "" ) + "/api/v1/mailing/";
1517

controllers/subscriptions.js

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1103,7 +1103,8 @@ exports.simulateAddPost = async ( req, res, next ) => {
11031103
/**
11041104
* This is the future REST endpoint handler function for queuing a mailing with Notify
11051105
*/
1106-
/* It is commented until the mailing.js module is updated to use Bull to make the API call. Related to APPS-53 work.
1106+
//It is commented until the mailing.js module is updated to use Bull to make the API call. Related to APPS-53 work.
1107+
/*
11071108
exports.sendMailing = async ( req, res, next ) => {
11081109
const email = req.body.email,
11091110
templateId = req.body.templateId,
@@ -1126,8 +1127,8 @@ exports.sendMailing = async ( req, res, next ) => {
11261127
11271128
11281129
res.json( _successJSO );
1129-
}*/
1130-
1130+
}
1131+
*/
11311132
/**
11321133
* This is the function for queuing a subscriber confirmation email
11331134
* send via notify.

controllers/workerSendEmail.js

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,8 @@ async function init() {
6464
console.log( e );
6565
throw new Error( "Worker: Can find the topic: " + topicId );
6666
});
67-
67+
console.log("workerSendEmail -- topic")
68+
console.log(topic)
6869
let templateId,
6970
notifyKey = topic.notifyKey;
7071

@@ -108,6 +109,8 @@ async function init() {
108109
* Send the mailing
109110
*
110111
*/
112+
console.log("worker _notifyEndPoint " + _notifyEndPoint)
113+
console.log("notifyKey " + notifyKey);
111114
let notifyClient = new NotifyClient( _notifyEndPoint, notifyKey );
112115

113116
//console.log( "_notifyEndPoint: " + _notifyEndPoint );

docker-compose.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ services:
1717
- "6379:6379"
1818
networks:
1919
- x-notify-net
20-
x-notify:
20+
x_notify:
2121
build: ./
2222
container_name: x-notify
2323
ports:

helpers/sendEmail.js

Lines changed: 282 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,282 @@
1+
const chalk = require('chalk'); // To color message in console log
2+
3+
const MongoClient = require('mongodb').MongoClient;
4+
const { workerData, parentPort } = require('worker_threads')
5+
6+
const NotifyClient = require('notifications-node-client').NotifyClient;
7+
8+
const ObjectId = require('mongodb').ObjectId;
9+
//const dbConn = module.parent.parent.exports.dbConn;
10+
11+
const processEnv = process.env,
12+
_notifyEndPoint = processEnv.notifyEndPoint || "https://api.notification.alpha.canada.ca",
13+
_unsubBaseURL = process.env.removeURL || "https://apps.canada.ca/x-notify/subs/remove/",
14+
_subsLinkSuffix = process.env.subsLinkSuffix || "853e0212b92a127"
15+
16+
17+
let dbConn, notifyKey;
18+
19+
MongoClient.connect( processEnv.MONGODB_URI || '', {useUnifiedTopology: true} ).then( ( mongoInstance ) => {
20+
21+
dbConn = mongoInstance.db( processEnv.MONGODB_NAME || 'subs' );
22+
23+
}).catch( (e) => { console.log( "%s Worker MongoDB ERRROR: %s", chalk.red('✗'), e ) } );
24+
25+
26+
exports.getSubscribers = async (job, done, createJob) => {
27+
console.log("job data -----> ")
28+
console.log(job)
29+
30+
// Ensure we have received all the data
31+
if ( !job.mailingBody || !job.mailingSubject ) {
32+
throw new Error( "Send Email: No email body" );
33+
}
34+
35+
if ( !job.topicId ) {
36+
throw new Error( "Send Email: No topicId selected" );
37+
}
38+
39+
/*
40+
* Get mailing notify information
41+
*
42+
*/
43+
console.log("module.parent.parent.exports.dbConn " + module.parent.parent.exports )
44+
let topic = await dbConn.collection( "topics" ).findOne(
45+
{ _id: job.topicId },
46+
{ projection: {
47+
nTemplateMailingId: 1,
48+
templateId: 1,
49+
notifyKey: 1,
50+
}
51+
} ).catch( (e) => {
52+
console.log( "sendEmail-getTopic" );
53+
console.log( e );
54+
throw new Error( "sendEmail: Can't find the topic: " + job.topicId );
55+
});
56+
57+
let templateId;
58+
notifyKey = topic.notifyKey;
59+
60+
61+
if ( !topic.nTemplateMailingId ) {
62+
throw new Error( "Worker: There is no mailing template associated with : " + topicId );
63+
}
64+
65+
console.log("topic.nTemplateMailingId " + topic.nTemplateMailingId);
66+
67+
// Get the correct notify email template
68+
if ( job.typeMailing === "msgUpdates" ) {
69+
templateId = topic.nTemplateMailingId;
70+
} else if ( job.typeMailing === "confirmSubs" ) {
71+
templateId = topic.templateId;
72+
} else {
73+
throw new Error( "Worker: Invalid type mailing, was : " + job.typeMailing );
74+
}
75+
76+
/*
77+
* Get list of confirmed subscribers
78+
*
79+
*/
80+
let listEmail = [];
81+
82+
if ( Array.isArray( job.sentTo ) ) {
83+
listEmail = sentTo;
84+
} else if ( job.sentTo === "allSubs" ) {
85+
listEmail = await getConfirmedSubscriberAsArray( job.topicId );
86+
}
87+
88+
// No subscribers
89+
if ( !listEmail.length ){
90+
console.log( "Worker: No subscriber" );
91+
92+
//parentPort.postMessage( { msg: "No subscriber" } );
93+
94+
}
95+
let emailData = {
96+
listEmail : listEmail,
97+
notifyKey : notifyKey,
98+
emailData : job,
99+
100+
}
101+
console.log("sendM+EMAil")
102+
console.log(typeof createJob);
103+
createJob("sendEmailQueue", emailData);
104+
}
105+
106+
/*
107+
* Utilities function
108+
*/
109+
getConfirmedSubscriberAsArray = async ( topicId ) => {
110+
111+
// Get all the emails for the given topic
112+
let docs = await dbConn.collection( "subsConfirmed" ).find(
113+
{
114+
topicId: topicId
115+
},
116+
{
117+
projection: {
118+
email: 1,
119+
subscode: 1
120+
}
121+
}
122+
);
123+
124+
let docsItems = await docs.toArray();
125+
126+
return docsItems;
127+
};
128+
129+
exports.processSendEmail = async (job, done) => {
130+
//console.log("emailListv " + job)
131+
console.log("sendEmail _notifyEndPoint " + _notifyEndPoint)
132+
console.log("notifyKey " + job.notifyKey);
133+
134+
135+
notifyClient = new NotifyClient( _notifyEndPoint, job.notifyKey );
136+
let listEmail = job.listEmail;
137+
console.log("sendEmail listEmail " + listEmail);
138+
let i, i_len = listEmail.length, i_cache;
139+
for( i = 0; i !== i_len; i++) {
140+
i_cache = listEmail[ i ];
141+
142+
const { email, subscode } = i_cache;
143+
144+
const userCodeUrl = ( subscode.id ? subscode.toHexString() : subscode );
145+
146+
//console.log( "Worker: Send for : " + email );
147+
148+
if ( !email ) {
149+
continue;
150+
}
151+
152+
parentPort.postMessage( { msg: "Send for : " + email } );
153+
154+
155+
console.log( "templateId: " + templateId );
156+
console.log( "email: " + email );
157+
console.log( "subject: " + mailingSubject );
158+
console.log( "body: " + mailingBody );
159+
console.log( "unsub_link: " + _unsubBaseURL + userCodeUrl + "/" + _subsLinkSuffix );
160+
console.log( "reference: " + "x-notify_" + typeMailing );
161+
162+
notifyClient.sendEmail( templateId, email,
163+
{
164+
personalisation: {
165+
body: mailingBody,
166+
subject: mailingSubject,
167+
unsub_link: _unsubBaseURL + userCodeUrl + "/" + _subsLinkSuffix
168+
},
169+
reference: "x-notify_" + typeMailing
170+
}).catch( ( e ) => {
171+
// Log the Notify errors
172+
// console.log( "Error in Notify" );
173+
// console.log( e );
174+
175+
//parentPort.postMessage( { msg: "worker-Error in Notify" } );
176+
177+
const currDate = new Date(),
178+
currDateTime = currDate.getTime(),
179+
errDetails = e.error.errors[0],
180+
statusCode = e.error.status_code,
181+
msg = errDetails.message;
182+
183+
184+
185+
if ( statusCode === 400 && msg.indexOf( "email_address" ) !== -1 ) {
186+
187+
//
188+
// We need to remove that user and log it
189+
//
190+
// Removal of bad email should be done after 25 min, same delay used to the not-before
191+
// The following task need to be quoeud and delayed. It could be addressed at the same time of APPS-26
192+
//dbConn.collection( "subsUnconfirmed" ).findOneAndDelete(
193+
// {
194+
// email: email
195+
// }
196+
//)
197+
//dbConn.collection( "subsExist" ).findOneAndDelete(
198+
// {
199+
// e: email
200+
// }
201+
//)
202+
203+
204+
// Log
205+
dbConn.collection( "notify_badEmail_logs" ).insertOne(
206+
{
207+
createdAt: currDate,
208+
code: userCodeUrl,
209+
email: email
210+
}
211+
).catch( (e2) => {
212+
console.log( "worker-sendNotifyConfirmEmail: notify_badEmail_logs: " + userCodeUrl );
213+
console.log( e2 );
214+
console.log( e );
215+
});
216+
217+
} else if ( statusCode === 429 ) {
218+
219+
//
220+
// This is a rate limit error, the system should notify us
221+
//
222+
dbConn.collection( "notify_tooManyReq_logs" ).insertOne(
223+
{
224+
createdAt: currDate,
225+
email: email,
226+
code: userCodeUrl,
227+
templateId: templateId,
228+
details: msg
229+
}
230+
).catch( (e2) => {
231+
console.log( "worker-sendNotifyConfirmEmail: notify_tooManyReq_logs: " + userCodeUrl );
232+
console.log( e2 );
233+
console.log( e );
234+
});
235+
236+
//
237+
// Try to email us (only with the predefined interval)
238+
//
239+
if ( _notifyUsNotBeforeTimeLimit <= currDateTime ) {
240+
241+
letUsKnow( "429 Too Many Request error", {
242+
type: "ratelimit",
243+
currTime: currDateTime,
244+
lastTime: _notifyUsNotBeforeTimeLimit
245+
},
246+
true );
247+
248+
// Readjust the limit for the next period
249+
_notifyUsNotBeforeTimeLimit = currDateTime + _notifyUsTimeLimit;
250+
251+
}
252+
253+
} else {
254+
255+
//
256+
// Any other kind of error - https://docs.notifications.service.gov.uk/node.html#send-an-email-error-codes
257+
//
258+
// notify_logs entry - this can be async
259+
dbConn.collection( "notify_logs" ).insertOne(
260+
{
261+
createdAt: currDate,
262+
templateId: templateId,
263+
e: errDetails.error,
264+
msg: msg,
265+
statusCode: statusCode,
266+
err: e.toString(),
267+
code: userCodeUrl
268+
}
269+
).catch( (e2) => {
270+
console.log( "worker-sendNotifyConfirmEmail: notify_logs: " + userCodeUrl );
271+
console.log( e2 );
272+
console.log( e );
273+
});
274+
275+
}
276+
277+
console.log( "worker-sendNotifyConfirmEmail: sendEmail " + userCodeUrl );
278+
mailingUpdate( mailingId, _mailingState.sent, { historyState: _mailingState.sending } );
279+
});
280+
}
281+
}
282+

0 commit comments

Comments
 (0)