-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathserver.js
More file actions
422 lines (340 loc) · 11 KB
/
server.js
File metadata and controls
422 lines (340 loc) · 11 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
/**
* This is the main Node.js server script for your project
* Check out the two endpoints this back-end API provides in fastify.get and fastify.post below
*/
const { PKPWallet } = require('@lit-protocol/pkp-ethers.js-node');
const WebSocket = require("ws");
require('dotenv').config();
const { createClient } = require('@supabase/supabase-js');
const fs = require('fs');
const fetch = require("cross-fetch");
const fastify = require("fastify")({
logger: false,
});
PORT = 8081;
WS_PORT = 8080;
const SUPABASE_URL = process.env.SUPABASE_URL;
const AUTH_SIG = process.env.AUTH_SIG;
const ORBIS_KEY = process.env.ORBIS_KEY;
let jobs = [];
let donePosts = [];
const connections = {};
// load jobs from file
log("Loading jobs.json");
try {
jobs = JSON.parse((fs.readFileSync('jobs.json')).toString())
log("Job loaded");
} catch (e) {
log("Error loading jobs.json");
jobs = [];
}
// load done posts from file
log("Loading done.json");
try {
donePosts = JSON.parse((fs.readFileSync('done.json')).toString())
log("Done posts loaded");
} catch (e) {
log("Error loading done.json");
donePosts = [];
}
const indexer = createClient(SUPABASE_URL, ORBIS_KEY);
function log(msg) {
let date = getNow();
let logMsg = `[${date}] ${msg}`;
console.log(logMsg);
fs.appendFile('log.txt', logMsg + '\n', () => { });
}
function saveCache() {
log("Cache saved");
fs.writeFileSync('jobs.json', JSON.stringify(jobs, null, 2));
fs.writeFileSync('done.json', JSON.stringify(donePosts, null, 2));
}
process.on('SIGINT', () => {
saveCache();
process.exit();
});
// save file when process exits
process.on('exit', () => {
saveCache();
});
var cache = [
{
timestamp: "",
data: "",
},
];
const wss = new WebSocket.Server({ port: WS_PORT, host: "0.0.0.0" });
wss.on('connection', (ws, req) => {
const connection = req.connection.remoteAddress;
// console.log(`New WebSocket connection ${connection}`);
connections[connection] = ws;
// ws.on('message', (message) => {
// console.log(`Received message: ${message}`);
// });
});
var lastConnectionsLength;
var lastJobsLength;
function getNow() {
const date = new Date().toISOString().replace(/T/, ' ').replace(/\..+/, '');
return date;
}
setInterval(() => {
// only print if the length of connections or jobs has changed
if (lastConnectionsLength !== Object.keys(connections).length || lastJobsLength !== jobs.length) {
lastConnectionsLength = Object.keys(connections).length;
lastJobsLength = jobs.length;
log(`${lastConnectionsLength} connections and ${lastJobsLength} jobs`);
}
}, 2000);
var state = '';
async function infiniteLoop() {
while (true) {
// delay 2 seconds
await new Promise(resolve => setTimeout(resolve, 2000));
// skip while loop
if (jobs.length === 0) {
var msg = "no jobs found. contiune.";
if (state !== msg) {
state
log(msg);
}
continue;
}
for (let i = 0; i < jobs.length; i++) {
// if job is chat_message
if (jobs[i].task === "chat_message") {
const job = jobs[i];
const did = job.params.pkp.did;
const pkpPubKey = job.params.pkp.pubKey;
// did: did:pkh:eip155:1:0x6d30a9f79a35fe3ede1827f8fd0050ada6fea901
// pkpPubKey: 0x040b1f9dba171e2d62cb244082c7fe83917135bd29c3b4dfa10c6ce7b5d7488
// console.log(`did: ${did}`);
// console.log(`pkpPubKey: ${pkpPubKey}`);
var posts;
var page = 0;
// continue;
try {
// fetch orbis posts by did
posts = await indexer.rpc("all_did_master_posts", {
post_did: did,
}).range(page * 50, (page + 1) * 50 - 1);
posts = posts.data;
} catch (e) {
console.log(e);
// clearInterval(interval);
}
if (posts) {
for (let j = 0; j < posts.length; j++) {
var post = posts[j];
const streamId = post.stream_id;
const content = post.content.body;
const timestamp = post.timestamp;
// const creatorDid = post.creator;
// const creatorAddress = post.creator_details.metadata.address;
// ignore posts older than 5 minutes
// 300000 = 5 mins
if ((timestamp * 1000) < (Date.now() - 9000000)) {
continue;
}
// ignore if post is already done
if (donePosts.includes(post.stream_id)) {
continue;
}
// if command is /send
// /send 0x65d86B3E0E8B92a0FF6197Cb0fE5847835B78c5e 1
if (content.includes("/send")) {
//
console.log("Found test message");
console.log(content);
var commands = content.split(' ');
const address = commands[1];
const amount = commands[2];
let pkpWallet = new PKPWallet({
pkpPubKey: pkpPubKey,
controllerAuthSig: JSON.parse(AUTH_SIG),
provider: "https://rpc-mumbai.maticvigil.com",
})
await pkpWallet.init();
const tx = {
to: address,
value: parseInt(amount),
};
console.log(tx);
var signedTx;
var sentTx;
console.log("signing transaction");
try {
signedTx = await pkpWallet.signTransaction(tx);
} catch (e) {
console.log("Error signing transaction");
console.log(e);
if (!donePosts.includes(streamId)) {
donePosts.push(streamId);
console.log(`[[${getNow()}]] Archived ${streamId}`);
}
return;
}
console.log("sending transaction");
try {
sentTx = await pkpWallet.sendTransaction(signedTx);
} catch (e) {
console.log("Error sending transaction");
console.log(e);
return;
}
log("Sent transaction");
// save it to done tasks if not already done
if (!donePosts.includes(streamId)) {
donePosts.push(streamId);
log(`Archived ${streamId}`);
}
}
}
}
}
}
// var str = (await res.json());
// // filter out item.content.body include the keyword "/test"
// str = str.filter(item => item.content.body.includes("/test"));
// str = JSON.stringify(str);
// ws.send(str);
// ws.send(JSON.stringify(jobs));
// for each connection, send message
for (const connection in connections) {
connections[connection].send(JSON.stringify(jobs));
}
}
}
infiniteLoop();
setInterval(() => {
if (cache.length > 0) {
console.log(`cache has length ${cache.length}`);
console.log("clearing now");
cache = [];
console.log(`cache cleared. Now it has ${cache.length}`);
}
}, 60000);
function hasSameDataWithinElapsedTime(arr, data, elapsedTime) {
const now = Date.now();
for (let i = arr.length - 1; i >= 0; i--) {
const timeDifference = now - arr[i].timestamp;
if (
timeDifference < elapsedTime &&
JSON.stringify(arr[i].data) === JSON.stringify(data)
) {
return true;
}
}
return false;
}
fastify.register(require("@fastify/cors"), (instance) => {
return (req, callback) => {
const corsOptions = {
// This is NOT recommended for production as it enables reflection exploits
origin: true,
};
// do not include CORS headers for requests from localhost
if (/^localhost$/m.test(req.headers.origin)) {
corsOptions.origin = false;
}
// callback expects two parameters: error and options
callback(null, corsOptions);
};
});
fastify.post("/api/check", async (req, res) => {
var data = req.body;
if (hasSameDataWithinElapsedTime(cache, data, 5000)) {
console.log("Same data found within the specified elapsed time");
res
.code(200)
.header("Content-Type", "application/json; charset=utf-8")
.send({ status: "nope" });
} else {
console.log("No same data found within the specified elapsed time");
cache.push({
timestamp: new Date(),
data: data,
});
res
.code(200)
.header("Content-Type", "application/json; charset=utf-8")
.send({ status: "ok" });
}
});
fastify.post('/api/job', async (req, res) => {
var data = req.body;
const pkpAddress = data.params.pkp.address;
if (data.task === 'chat_message') {
// check if address has been pushed to jobs array already with the same task name
// if yes, then don't push it again
// if no, then push it to jobs array
if (jobs.filter(job => job.params.pkp.address === pkpAddress && job.task === data.task).length === 0) {
jobs.push(data);
res
.code(200)
.header("Content-Type", "application/json; charset=utf-8")
.send({ status: "ok" });
log(`a new job has been added ${JSON.stringify(data.params.pkp.address)}`);
} else {
res
.code(200)
.header("Content-Type", "application/json; charset=utf-8")
.send({ status: "job already exists" });
log(`job already exists ${data.params.pkp.address}`);
}
} else if (data.task === 'remove_job') {
// remove the job from jobs array
const index = jobs.findIndex(job => job.params.pkp.address === pkpAddress && job.task === data.params.task);
if (index > -1) {
jobs.splice(index, 1);
res
.code(200)
.header("Content-Type", "application/json; charset=utf-8")
.send({ status: "ok" });
log(`job removed ${data.params.task}`);
} else {
res
.code(200)
.header("Content-Type", "application/json; charset=utf-8")
.send({ status: "job not found" });
log(`job not found ${data.params.task}`);
}
} else {
res
.code(500)
.header("Content-Type", "application/json; charset=utf-8")
.send({ status: 500, message: "unrecognized task" });
log(`job not recognized ${data.params.task}`);
}
console.log("data => ", data);
})
fastify.post('/api/has/job', async (req, res) => {
var data = req.body;
const pkpAddress = data.params.pkp.address;
// check if address has been pushed to jobs array already with the same task name
// if yes, then don't push it again
// if no, then push it to jobs array
if (jobs.filter(job => job.params.pkp.address === pkpAddress).length === 0) {
res
.code(200)
.header("Content-Type", "application/json; charset=utf-8")
.send({ status: "no job" });
}
else {
res
.code(200)
.header("Content-Type", "application/json; charset=utf-8")
.send({ status: "job exists" });
}
})
// Run the server and report out to the logs
fastify.listen(
{ port: PORT, host: "0.0.0.0" },
function (err, address) {
if (err) {
console.error(err);
process.exit(1);
}
console.log(`Your app is listening on ${address}`);
}
);