forked from proddata/nodeIngestBench
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathapp.js
More file actions
169 lines (144 loc) · 4.22 KB
/
app.js
File metadata and controls
169 lines (144 loc) · 4.22 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
require("dotenv").config();
const argv = require("minimist")(process.argv.slice(2));
const { Worker, workerData } = require("worker_threads");
const axios = require("axios");
const https = require("https");
const dataGenerator = require("./modules/dataGenerator");
const sqlGenerator = require("./modules/sqlGenerator");
const worker = new Worker("./app_worker.js");
const crateConfig = {
user: process.env.CRATE_USER || "crate",
host: process.env.CRATE_HOST || "localhost",
password: process.env.CRATE_PASSWORD || "",
port: process.env.CRATE_PORT || 4200,
};
const options = {
dropTable: true,
batchsize: Number(argv.batchsize) || 10000,
max_rows: Number(argv.max_rows) || 1 * 1000 * 1000,
table: argv.table || "georg.cpu2",
shards: Number(argv.shards) || 12,
concurrent_requests: Number(argv.concurrent_requests) || 20
};
console.log("-------- Options ---------");
console.log(options);
console.log("--------------------------");
// Axios CrateDB HTTP setup
const crate_api = `https://${crateConfig.host}:${crateConfig.port}/_sql`;
const agent = new https.Agent({
rejectUnauthorized: false
});
const crate_api_config = {
auth: {
username: crateConfig.user,
password: crateConfig.password
},
httpsAgent: agent
};
// SQL Statements
const STATEMENT = {
dropTable: sqlGenerator.getDropTable(options.table),
createTable: sqlGenerator.getCreateTable(options.table, options.shards),
insert: sqlGenerator.getInsert(options.table),
numDocs: sqlGenerator.getNumDocs(options.table),
refresh: sqlGenerator.getRefreshTable(options.table)
};
let args_buffer = getNewBufferSync();
let stats = {
inserts: 0,
inserts_done: 0,
inserts_max: Math.ceil(options.max_rows / options.batchsize),
ts_start: -1,
ts_end: -1
};
setup();
// Benchmark Logic
async function setup() {
await prepareTable();
stats.ts_start = Date.now() / 1000;
addInsert();
}
async function prepareTable() {
try {
await axios.post(
crate_api,
{ stmt: STATEMENT.dropTable },
crate_api_config
);
await axios.post(
crate_api,
{ stmt: STATEMENT.createTable },
crate_api_config
);
} catch (err) {
console.log(err);
}
}
async function addInsert() {
if (stats.inserts <= stats.inserts_max) {
if (stats.inserts - stats.inserts_done < options.concurrent_requests) {
stats.inserts++;
insert();
if (stats.inserts % options.concurrent_requests == 0) {
getNewBuffer();
}
addInsert();
} else {
setTimeout(addInsert, 10);
}
}
}
async function insert() {
let args_buffer_no = stats.inserts % options.concurrent_requests;
let body = {
stmt: STATEMENT.insert,
bulk_args: args_buffer[args_buffer_no]
};
try {
await axios.post(crate_api, body, crate_api_config);
} catch (err) {
console.log(err.response.data);
} finally {
stats.inserts_done++;
if (stats.inserts_done == stats.inserts_max) {
finish();
}
}
}
async function finish() {
stats.ts_end = Date.now() / 1000;
await axios.post(crate_api, { stmt: STATEMENT.refresh }, crate_api_config);
let time = stats.ts_end - stats.ts_start;
let records = stats.inserts_done * options.batchsize;
let speed = records / time;
console.log("-------- Results ---------");
console.log("Time\t", time.toLocaleString(), "s");
console.log("Rows\t", records.toLocaleString(), "records");
console.log("Speed\t", speed.toLocaleString(), "rows per sec");
console.log("-------- Results ---------");
}
// Worker handling
async function getNewBuffer() {
worker.postMessage(options);
}
function getNewBufferSync() {
return new Array(options.concurrent_requests).fill(
dataGenerator.getCPUObjectBulkArray(options.batchsize)
);
}
async function updateBuffer(message) {
args_buffer = message.args_buffer;
let progress = (stats.inserts_done * options.batchsize).toLocaleString();
console.log("Buffer updated - sent: ", progress);
}
worker.on("message", updateBuffer);
worker.on("error", msg => console.log(msg));
worker.on("exit", code => {
if (code !== 0)
reject(new Error(`Stopped the Worker Thread with the exit code: ${code}`));
});
process.on("SIGTERM", function () {
worker.postMessage({ exit: true });
console.log("Finished all requests");
process.exit();
});