-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathsyncClient.js
More file actions
129 lines (98 loc) · 3.73 KB
/
syncClient.js
File metadata and controls
129 lines (98 loc) · 3.73 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
'use strict';
const topLogPrefix = 'larvitamsync: syncClient.js: ',
Intercom = require('larvitamintercom'),
LUtils = require('larvitutils'),
lUtils = new LUtils(),
http = require('http');
function SyncClient(options, cb) {
const logPrefix = topLogPrefix + 'SyncClient() - ',
that = this;
that.options = options;
that.responseReceived = false;
that.extIntercom = that.options.intercom;
if ( ! options.log) {
options.log = new lUtils.Log();
}
that.log = options.log;
// We are strictly in need of the intercom!
if ( ! that.extIntercom) {
const err = new Error('options.intercom is required!');
that.log.error(logPrefix + err.message);
throw err;
}
// Reconnect so we have a fresh instance of intercom so we do not interfer with others
that.log.verbose(logPrefix + 'Starting temporary intercom');
if (that.extIntercom.conStr === 'loopback interface') {
that.log.warn(logPrefix + 'Running intercom on "loopback interface", this is probably not what you want');
}
that.intercom = new Intercom(that.extIntercom.conStr);
that.intercom.on('ready', function (err) {
if (err) return cb(err);
that.log.info(logPrefix + 'started on exchange: "' + that.options.exchange + '"');
that.options = options;
that.intercom.subscribe({'exchange': that.options.exchange}, function (message, ack) {
// We do this weirdness because it seems that becomes undefined if we pass it directly as a parameter
that.handleMsg(message, ack, cb);
}, function (err) {
if (err) return cb(err);
that.intercom.send({'action': 'requestDump'}, {'exchange': that.options.exchange}, function (err) {
if (err) return cb(err);
});
});
});
}
SyncClient.prototype.handleMsg = function (message, ack, cb) {
const reqOptions = {},
logPrefix = topLogPrefix + 'SyncClient.prototype.handleMsg() - ',
that = this;
let req;
that.log.debug(logPrefix + 'Incoming message: "' + JSON.stringify(message) + '"');
ack();
if (message.action !== 'dumpResponse') {
that.log.debug(logPrefix + 'Ignoring message because action is not "dumpResponse", but: "' + message.action + '"');
return;
}
if (that.responseReceived !== false) {
that.log.debug(logPrefix + 'Ignoring message because response is already received');
return;
}
that.responseReceived = true; // Ignore future messages on this subscription
if ( ! message.endpoints) {
const err = new Error('message.endpoints does not contain network endpoints to connecto to');
that.log.error(logPrefix + err.message);
return cb(err);
}
if ( ! message.endpoints[0].protocol) {
message.endpoints[0].protocol = 'http';
}
reqOptions.protocol = message.endpoints[0].protocol + ':';
reqOptions.host = message.endpoints[0].host;
reqOptions.port = message.endpoints[0].port;
reqOptions.headers = {'token': message.endpoints[0].token};
that.log.verbose(logPrefix + 'Sending a sync request. reqOptions: ' + JSON.stringify(reqOptions));
if (that.options.requestOptions !== undefined) {
for (const key of Object.keys(that.options.requestOptions)) {
reqOptions[key] = that.options.requestOptions[key];
}
}
that.log.verbose(logPrefix + 'Sending request: "' + JSON.stringify(reqOptions) + '"');
req = http.request(reqOptions, function (res) {
if (res.statusCode !== 200) {
const err = new Error('Non 200 statusCode: ' + res.statusCode);
that.log.error(logPrefix + 'Request failed: ' + err.message);
return cb(err);
}
res.on('error', function (err) {
that.log.error(logPrefix + 'res.on(error): ' + err.message);
});
cb(null, res);
});
req.end();
req.on('error', function (err) {
that.log.error(logPrefix + 'Request failed: ' + err.message);
cb(err);
});
that.log.verbose(logPrefix + 'Closing temporary intercom');
that.intercom.close();
};
exports = module.exports = SyncClient;