-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathweb_server.hpp
More file actions
346 lines (300 loc) · 14.1 KB
/
Copy pathweb_server.hpp
File metadata and controls
346 lines (300 loc) · 14.1 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
/**
* @file web_server.hpp
* @brief Business logic layer. Owns WorkerPool, Predictor, and DispatchPool.
* Counts RPS per second and drives predictive pre-warming via ScaleTo().
* Supports five scaling modes: ewma | ewma_adaptive | reactive | static | arima.
*
* CSCI 599: Network Systems for Cloud Computing
* University of Southern California
*/
#ifndef __WEB_SERVER_HPP__
#define __WEB_SERVER_HPP__
#include "tcp_server.hpp"
#include "util.hpp"
#include "WorkerPool.hpp"
#include "Predictor.hpp"
#include "DispatchPool.hpp"
#include <iostream>
#include <memory>
#include <chrono>
#include <sys/socket.h>
#include <sys/un.h>
#include <sys/wait.h>
#include <string.h>
#include <unistd.h>
#include <cerrno>
static constexpr const char* ARIMA_CTRL_SOCK = "/tmp/faas_arima.sock";
#define PORT 8080
// Cold-start fallback response (sent when no warm worker is available).
// Includes the simulated 800ms cold-start delay to make the penalty visible.
static const char* COLD_START_FALLBACK =
"HTTP/1.1 200 OK\r\n"
"Content-Length: 21\r\n"
"Connection: close\r\n\r\n"
"[Cold Start Fallback]";
namespace fengcheng {
class web_server {
private:
std::unique_ptr<fengcheng::tcp_server> __reactor;
fengcheng::WorkerPool __worker_pool;
Predictor __predictor;
DispatchPool __dispatch_pool;
int __current_second_req_count;
time_t __last_time;
std::string __mode; // "ewma" | "reactive" | "static" | "arima"
int __static_n; // only used in "static" mode
pid_t __arima_pid; // only used in "arima" mode
public:
web_server(int port = PORT, const std::string& mode = "ewma",
int static_n = 5, bool use_cow_template = true)
: __worker_pool("/tmp/faas_worker_", mode == "static" ? 0 : 6, use_cow_template),
// static mode: idle_timeout=0 (scavenger disabled — pool size is fixed permanently)
// other modes: idle_timeout=6s (scavenger kills workers idle >6s for scale-down)
__dispatch_pool(64), // 64 threads: handles 30 RPS × 0.8s cold = 24 simultaneous
__current_second_req_count(0),
__mode(mode),
__static_n(static_n),
__arima_pid(-1) {
__reactor = std::unique_ptr<fengcheng::tcp_server>(new fengcheng::tcp_server(port));
__last_time = time(nullptr);
if (__mode == "static") {
// Pre-warm N workers immediately at startup; they stay alive forever (no scavenger).
__worker_pool.ScaleTo(__static_n);
logMessage(NORMAL, "[WebServer/static] Pre-warming %d workers at startup.", __static_n);
} else if (__mode == "arima") {
_start_arima_process();
}
}
~web_server() {
if (__arima_pid > 0) {
kill(__arima_pid, SIGKILL);
waitpid(__arima_pid, nullptr, 0);
unlink(ARIMA_CTRL_SOCK);
logMessage(NORMAL, "[WebServer/arima] ARIMA predictor process (pid=%d) terminated.", __arima_pid);
}
}
void start() {
__reactor->dispather(
std::bind(&web_server::respones, this,
std::placeholders::_1, std::placeholders::_2));
}
// Called on the Reactor thread for every complete HTTP request.
// MUST return quickly — no blocking I/O here.
// All UDS connect/send/recv work is pushed to DispatchPool threads.
void respones(fengcheng::connection* conn, std::string& request) {
// Parse HTTP: find the body after the blank line
auto pos = request.find("\r\n\r\n");
if (pos == std::string::npos) return; // incomplete request — wait for more data
std::string body = request.substr(pos + 4); // copy before clearing buffer
conn->__in_buffer.clear();
// --- Control plane: update predictor every 2 seconds ---
// A 2-second window averages out the per-second measurement noise caused by
// load-tester thread scheduling vs. server wall-clock window misalignment.
// Without this, individual seconds can measure as low as 5 RPS during a
// 20 RPS ramp, causing CUSUM to reset before reaching the alarm threshold.
__current_second_req_count++;
time_t now = time(nullptr);
if (now - __last_time >= 2) {
int rps = __current_second_req_count / 2; // average RPS over 2s
__last_time = now;
__current_second_req_count = 0;
int target;
if (__mode == "reactive") {
target = __predictor.ReactivePredict(rps);
} else if (__mode == "static") {
target = __static_n;
} else if (__mode == "arima") {
double T = __predictor.GetAvgServiceTime();
double latency_ms = 0.0;
target = _query_arima(rps, T, latency_ms);
// Emit canonical [PredLatency] line so the analysis script
// can grep ARIMA latency the same way as EWMA / CUSUM.
// ARIMA us = subprocess RTT (Unix-socket round-trip + statsmodels fit).
long long us = (long long)(latency_ms * 1000.0);
logMessage(NORMAL, "[PredLatency] mode=arima us=%lld rps=%d target=%d",
us, rps, target);
logMessage(NORMAL, "[WebServer/arima] RPS=%d IdleWorkers=%d Target=%d InferenceLatency=%.2fms",
rps, __worker_pool.IdleCount(), target, latency_ms);
} else if (__mode == "ewma_adaptive") {
// EWMA + Standardized CUSUM (baseline-invariant via EWMSD).
target = __predictor.AdaptivePredict(rps);
} else {
// default: "ewma" — EWMA + fixed-drift CUSUM + Little's Law
target = __predictor.UpdateAndPredict(rps);
}
if (__mode != "arima") {
logMessage(NORMAL, "[WebServer/%s] RPS=%d IdleWorkers=%d Target=%d",
__mode.c_str(), rps, __worker_pool.IdleCount(), target);
}
// ScaleTo now contacts the template process via UDS (blocking I/O).
// Push to DispatchPool to avoid blocking the Reactor thread.
fengcheng::WorkerPool* pool_ptr_scale = &__worker_pool;
__dispatch_pool.push([pool_ptr_scale, target]() {
pool_ptr_scale->ScaleTo(target);
});
}
// Save everything we need BEFORE detaching the connection object
int client_fd = conn->__sock;
fengcheng::tcp_server* tsvr = conn->__tsvr;
fengcheng::WorkerPool* pool_ptr = &__worker_pool;
Predictor* pred_ptr = &__predictor;
int worker_id = __worker_pool.GetIdleWorkerId();
// Detach the client fd from the Reactor: removes from epoll + connection_map,
// deletes the connection object, but does NOT close the fd.
// Ownership transfers to the DispatchPool task — it must close(client_fd).
tsvr->detach_connection(client_fd);
// conn is now a dangling pointer — never touch it again after this line.
if (worker_id == -1) {
// No warm workers available: send cold-start fallback in a dispatch thread.
// The 800ms sleep simulates the cold-start penalty for measurement purposes.
__dispatch_pool.push([client_fd]() {
usleep(800000);
send(client_fd, COLD_START_FALLBACK, strlen(COLD_START_FALLBACK), MSG_NOSIGNAL);
close(client_fd);
});
logMessage(WARNING, "[WebServer] No idle worker — queued cold-start fallback (fd=%d)", client_fd);
return;
}
std::string sock_path = __worker_pool.GetSockPath(worker_id);
// Dispatch the full UDS pipeline to a worker thread.
// The Reactor returns immediately and continues processing other events.
__dispatch_pool.push([client_fd, body, worker_id, sock_path, pool_ptr, pred_ptr]() {
auto t_start = std::chrono::steady_clock::now();
// --- Connect to the pre-warmed Python worker via Unix Domain Socket ---
int worker_fd = socket(AF_UNIX, SOCK_STREAM, 0);
if (worker_fd < 0) {
pool_ptr->ReturnWorkerId(worker_id);
close(client_fd);
return;
}
struct sockaddr_un addr;
addr.sun_family = AF_UNIX;
strncpy(addr.sun_path, sock_path.c_str(), sizeof(addr.sun_path) - 1);
if (connect(worker_fd, (struct sockaddr*)&addr, sizeof(addr)) < 0) {
logMessage(ERROR, "[Dispatch] connect to worker %d failed: %s",
worker_id, strerror(errno));
close(worker_fd);
pool_ptr->ReturnWorkerId(worker_id);
const char* err = "HTTP/1.1 500 Internal Server Error\r\n"
"Content-Length: 0\r\nConnection: close\r\n\r\n";
send(client_fd, err, strlen(err), MSG_NOSIGNAL);
close(client_fd);
return;
}
// --- Forward request body to Python worker ---
if (send(worker_fd, body.c_str(), body.size(), MSG_NOSIGNAL) < 0) {
logMessage(ERROR, "[Dispatch] send to worker %d failed: %s",
worker_id, strerror(errno));
close(worker_fd);
pool_ptr->ReturnWorkerId(worker_id);
close(client_fd);
return;
}
// Signal EOF to the worker so it knows the full request arrived
shutdown(worker_fd, SHUT_WR);
// --- Receive complete response from Python worker (blocking until EOF) ---
std::string result;
char buf[4096];
while (true) {
ssize_t n = recv(worker_fd, buf, sizeof(buf) - 1, 0);
if (n > 0) {
result.append(buf, n);
} else if (n == 0) {
break; // EOF: worker closed the connection after sending response
} else {
if (errno == EINTR) continue;
logMessage(ERROR, "[Dispatch] recv from worker %d failed: %s",
worker_id, strerror(errno));
break;
}
}
close(worker_fd);
// Worker is free for the next request
pool_ptr->ReturnWorkerId(worker_id);
// Feed measured service time back to the predictor (Little's Law T)
auto t_end = std::chrono::steady_clock::now();
double elapsed = std::chrono::duration<double>(t_end - t_start).count();
pred_ptr->UpdateServiceTime(elapsed);
// --- Send result back to the original HTTP client ---
if (!result.empty()) {
send(client_fd, result.c_str(), result.size(), MSG_NOSIGNAL);
}
close(client_fd); // we own this fd — close it when done
});
}
private:
// -----------------------------------------------------------------------
// ARIMA process management
// -----------------------------------------------------------------------
// Fork and exec arima_predictor.py. Block until it writes "READY\n" to
// the pipe (socket is bound and listening), then return.
void _start_arima_process() {
int pipefd[2];
if (pipe(pipefd) < 0) {
logMessage(FATAL, "[WebServer/arima] pipe() failed: %s", strerror(errno));
return;
}
pid_t pid = fork();
if (pid == 0) {
// ---- CHILD: become the ARIMA predictor process ----
close(pipefd[0]);
for (int fd = 3; fd < 1024; fd++) {
if (fd != pipefd[1]) close(fd);
}
char fd_str[16];
snprintf(fd_str, sizeof(fd_str), "%d", pipefd[1]);
execlp("python3", "python3", "arima_predictor.py", fd_str, nullptr);
_exit(1);
}
close(pipefd[1]);
if (pid < 0) {
logMessage(FATAL, "[WebServer/arima] fork() failed: %s", strerror(errno));
close(pipefd[0]);
return;
}
logMessage(NORMAL, "[WebServer/arima] ARIMA predictor process forked (pid=%d). Waiting for READY...", pid);
char buf[32] = {};
ssize_t n = read(pipefd[0], buf, sizeof(buf) - 1);
close(pipefd[0]);
if (n <= 0 || strstr(buf, "READY") == nullptr) {
logMessage(FATAL, "[WebServer/arima] ARIMA process did not signal READY.");
kill(pid, SIGKILL);
waitpid(pid, nullptr, 0);
return;
}
__arima_pid = pid;
logMessage(NORMAL, "[WebServer/arima] ARIMA predictor READY (pid=%d).", pid);
}
// Send <rps>,<T> to the ARIMA process and receive <target>,<latency_ms>.
// Returns target worker count; writes inference latency into latency_ms_out.
// Falls back to reactive on any error.
int _query_arima(int rps, double T, double& latency_ms_out) {
int fd = socket(AF_UNIX, SOCK_STREAM, 0);
if (fd < 0) return (int)std::ceil(rps * T) + 1;
struct sockaddr_un addr = {};
addr.sun_family = AF_UNIX;
strncpy(addr.sun_path, ARIMA_CTRL_SOCK, sizeof(addr.sun_path) - 1);
if (connect(fd, (struct sockaddr*)&addr, sizeof(addr)) < 0) {
close(fd);
logMessage(WARNING, "[WebServer/arima] connect to ARIMA sock failed: %s", strerror(errno));
return (int)std::ceil(rps * T) + 1;
}
// Send: "<rps>,<T>\n"
char msg[64];
snprintf(msg, sizeof(msg), "%d,%.4f\n", rps, T);
send(fd, msg, strlen(msg), MSG_NOSIGNAL);
// Recv: "<target>,<latency_ms>\n"
char resp[64] = {};
ssize_t n = recv(fd, resp, sizeof(resp) - 1, 0);
close(fd);
if (n <= 0) return (int)std::ceil(rps * T) + 1;
int target = 1;
double lat = 0.0;
if (sscanf(resp, "%d,%lf", &target, &lat) == 2) {
latency_ms_out = lat;
}
return target;
}
};
} // namespace fengcheng
#endif