-
Notifications
You must be signed in to change notification settings - Fork 119
Expand file tree
/
Copy pathtorpedoCustomFilter.js
More file actions
163 lines (127 loc) · 5.39 KB
/
torpedoCustomFilter.js
File metadata and controls
163 lines (127 loc) · 5.39 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
// Implementation of custom filter for Torpedo
// It will take in a batch and then
// a) Emit 1:1 request for each message with "GA" as integration
// b) Emit 1:1 request for each non-spin message with "AM" as integration
const cluster = require("cluster");
const http = require("http");
const numCPUs = require("os").cpus().length;
const url = require("url");
const jsonQ = require("jsonq");
const amplitudeJS = require("./v0/AmplitudeTransform.js");
require("./util/logUtil");
function start(port) {
if (!port) {
port = 9393;
}
if (cluster.isMaster) {
console.log(`Master ${process.pid} is running`);
// Fork workers.
for (let i = 0; i < numCPUs; i++) {
cluster.fork();
}
cluster.on("exit", (worker, code, signal) => {
console.log(`worker ${worker.process.pid} died`);
});
} else {
// Main server body
http
.createServer(function(request, response) {
var pathname = url.parse(request.url).pathname;
// Adding logic for a call that will invalidate cache
// for particular module in order that next require call for
// that module will reload the same
if (request.method == "POST") {
var body = "";
var respBody = "";
request.on("data", function(data) {
body += data;
// Too much POST data, kill the connection!
// 1e6 === 1 * Math.pow(10, 6) === 1 * 1000000 ~~~ 1MB
if (body.length > 1e6) request.connection.destroy();
});
request.on("end", async function() {
try {
// need to send 400 error for malformed JSON
response.statusCode = 200;
var requestJson = JSON.parse(body);
var jsonQobj = jsonQ(requestJson);
// Iterate through the messages
// But first create the message list that will finally hold all messages
var messageList = [];
// Also create the map structure where total_payments will be
// tracked per user
var userTotalPayments = new Map();
// Also map structures for keeping user-specific
// context just for completeness sake. They
// do not influence processing in any way
var userContext = new Map();
// And finally a counter for catching every n-th message
var messageCounter = 0;
jsonQobj.find("rl_message").each(function(index, path, value) {
// Extract the rl_anonymous_id for direct inclusion under
// rl_message
var anonymousId = jsonQ(value)
.find("rl_anonymous_id")
.value()[0];
// rl_event will be required for populating "ec"
var eventName = jsonQ(value)
.find("rl_message")
.find("rl_event")
.value()[0];
// Construct single message
var messageObj = {};
// GA message is as-is full payload
messageObj.rl_message = { ...value };
// Set rl_integrations to only GA
// messageObj['rl_message']['rl_integrations'] = 'GA';
messageObj.rl_message.rl_integrations =
'{"All": false, "GA":true}';
// Add rl_anonymous_id
messageObj.rl_message.rl_anonymous_id = anonymousId;
// set category to rl_event value
// Temporary fix for non-existent rl_properties
if (!messageObj.rl_message.rl_properties) {
messageObj.rl_message.rl_properties = {};
}
messageObj.rl_message.rl_properties.category = eventName;
// Add the GA message
messageList.push(messageObj);
// Send only non-spin events to Amplitude
if (!(eventName && eventName.match(/spin_result/g))) {
// non-spin event
// Repeat construction
var messageObjAM = {};
// Amplitude message is as-is full payload for non-spin message
messageObjAM.rl_message = { ...value };
// Set rl_integrations to only Amplitude
messageObjAM.rl_message.rl_integrations =
'{"All": false, "AM":true}';
// Add rl_anonymous_id
messageObjAM.rl_message.rl_anonymous_id = anonymousId;
// Add the AM message
messageList.push(messageObjAM);
}
});
// Construct overall payload
var responseObj = {};
responseObj.sent_at = String(jsonQobj.find("sent_at").value());
responseObj.batch = messageList;
// response.end(JSON.stringify(responseObj));
// console.log(JSON.stringify(messageList))
response.end(JSON.stringify(messageList));
// response.end(body);
} catch (se) {
response.statusCode = 500; // 500 for other errors
response.statusMessage = se.message;
console.log(se.stack);
response.end();
}
});
}
})
.listen(port);
console.log(`Worker ${process.pid} started`);
}
console.log("aggregatorServer: started");
}
start(9393);