-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathPredictor.hpp
More file actions
214 lines (180 loc) · 9.94 KB
/
Copy pathPredictor.hpp
File metadata and controls
214 lines (180 loc) · 9.94 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
/**
* @file Predictor.hpp
* @brief Predictive pre-warming engine.
* EWMA tracks the periodic traffic baseline; CUSUM detects sudden spikes;
* Little's Law (N = ceil(lambda * T) + margin) determines target worker count.
*
* CSCI 599: Network Systems for Cloud Computing
* University of Southern California
*/
#ifndef __PREDICTOR_HPP__
#define __PREDICTOR_HPP__
#include <iostream>
#include <algorithm>
#include <chrono>
#include <cmath>
#include "log.hpp"
// Predictive pre-warming controller (V2.0 Plan).
//
// Algorithm:
// 1. EWMA — smooths periodic baseline, filters white noise.
// 2. CUSUM — O(1) sudden-change detector; fires when traffic spikes above baseline.
// 3. Little's Law: N = ceil(lambda * T) + M_safety
// lambda = EWMA-predicted RPS (or current_rps*1.5 on CUSUM alarm)
// T = avg service time per request (dynamically updated via feedback)
// M_safety = safety margin buffer
class Predictor {
private:
double _alpha; // EWMA smoothing factor (0.1–0.3)
double _ewma; // Current EWMA baseline
double _cusum; // CUSUM accumulator
double _drift; // Steady-state tolerance band (allowed deviation)
double _threshold; // CUSUM alarm threshold
// --- Standardized CUSUM state (used only by AdaptivePredict) ---
// Tracks EWMSD: an EWMA of |rps - baseline|, used as a running-scale proxy
// for the typical deviation magnitude. Each observation is divided by
// max(sigma, min_sigma) to produce a dimensionless z-score, so the alarm
// threshold is invariant to the absolute RPS level and to clock-aliasing
// effects that make a single window measure lower than its phase average.
double _sigma_ewma; // EWMSD estimate of typical |deviation|
double _beta; // EWMSD smoothing factor
double _k_std; // Page-Hinkley allowance in z-score units
double _h_std; // standardized alarm threshold
double _min_sigma; // lower clamp on sigma (avoid z explosion at startup)
double _cusum_std; // standardized CUSUM accumulator
double _avg_service_time; // T: average service time per request (seconds)
int _safety_margin; // M_safety: extra workers for prediction error buffer
public:
Predictor(double alpha = 0.2,
double drift = 5.0,
double threshold = 8.0, // CUSUM alarm threshold (tuned for 2s window).
// With true 20 RPS ramp and 2s windows:
// window1: CUSUM=8.6>8 → fires at ramp sec2,
// 2s BEFORE spike peak, workers pre-warmed.
// Safe at warmup: deviation=(2-2-5)<0 → CUSUM=0.
double avg_service_time = 0.1,
int safety_margin = 1)
: _alpha(alpha), _ewma(0),
_cusum(0), _drift(drift), _threshold(threshold),
_sigma_ewma(0), _beta(0.3), _k_std(0.5), _h_std(3.0), _min_sigma(1.0),
_cusum_std(0),
_avg_service_time(avg_service_time),
_safety_margin(safety_margin) {}
double GetAvgServiceTime() const { return _avg_service_time; }
// Feedback from DispatchPool threads: feed observed end-to-end latency (seconds)
// so T tracks real workload characteristics rather than a static guess.
//
// Sanity clamp: worker.py sleeps 0.5s per request; cold-start fallback is 0.8s;
// legitimate end-to-end never exceeds ~2s even under queuing. Observations > 5s
// indicate pathological state (OS backpressure, client suspend, late-arriving
// socket completion) and must NOT be smoothed into T — once observed before:
// T drifted 0.5s -> 91s -> 322s -> Little's Law produced Target=443 -> OOM.
void UpdateServiceTime(double observed_seconds) {
if (observed_seconds > 5.0 || observed_seconds < 0.0) {
logMessage(WARNING, "[Predictor] Discarded anomalous service time %.3fs (T kept at %.3fs)",
observed_seconds, _avg_service_time);
return;
}
_avg_service_time = 0.1 * observed_seconds + 0.9 * _avg_service_time;
}
// Core function: called once per second with the observed RPS.
// Returns the target total number of warm workers to maintain.
int UpdateAndPredict(int current_rps) {
auto t0 = std::chrono::high_resolution_clock::now();
// Step 1: EWMA — track periodic baseline
if (_ewma == 0.0) {
_ewma = current_rps;
} else {
_ewma = _alpha * current_rps + (1.0 - _alpha) * _ewma;
}
// Step 2: CUSUM — accumulate only when traffic is significantly above baseline
_cusum = std::max(0.0, _cusum + current_rps - _ewma - _drift);
double predicted_lambda = _ewma;
if (_cusum > _threshold) {
// Spike alarm: pre-warm aggressively for the anticipated peak
logMessage(WARNING, "[Predictor] SPIKE DETECTED CUSUM=%.2f RPS=%d -> predicted_lambda=%.1f",
_cusum, current_rps, current_rps * 1.5);
predicted_lambda = current_rps * 1.5;
_cusum = 0; // reset after acting on the alarm
}
// Step 3: Little's Law N = ceil(lambda * T) + M_safety
int target = (int)std::ceil(predicted_lambda * _avg_service_time) + _safety_margin;
// Per-tick inference latency — emitted at NORMAL so it lands in
// server.log for paper Section 5.4 (Pareto/overhead) extraction.
// Grep handle: [PredLatency] mode=ewma us=...
long long us = std::chrono::duration_cast<std::chrono::microseconds>(
std::chrono::high_resolution_clock::now() - t0).count();
logMessage(NORMAL, "[PredLatency] mode=ewma us=%lld rps=%d target=%d",
us, current_rps, target);
logMessage(DEBUG, "[Predictor] RPS=%d EWMA=%.2f T=%.3fs Target=%d",
current_rps, _ewma, _avg_service_time, target);
return target;
}
// E7 — Standardized (adaptive) CUSUM:
// sigma_t = beta * |rps - ewma| + (1 - beta) * sigma_{t-1} (EWMSD)
// z_t = (rps - ewma) / max(sigma_t, min_sigma)
// cusum = max(0, cusum + z_t - k_std)
// alarm if cusum > h_std
//
// Motivation: the fixed drift/threshold variant is brittle when the true
// Ramp boundary lands mid-window and the observed RPS is artificially
// low. Normalising by a running sigma removes the RPS-magnitude
// dependence, so C2/C4 fire at the same Ramp window as C1/C3.
int AdaptivePredict(int current_rps) {
auto t0 = std::chrono::high_resolution_clock::now();
// Step 1: EWMA baseline (same as UpdateAndPredict).
if (_ewma == 0.0) {
_ewma = current_rps;
} else {
_ewma = _alpha * current_rps + (1.0 - _alpha) * _ewma;
}
double deviation = current_rps - _ewma;
// Step 2: EWMSD — track typical |deviation| as a running scale estimate.
// We deliberately do NOT lazy-init _sigma_ewma to the first abs_dev:
// that makes the first large deviation self-normalise to z=1, which
// is insufficient to fire within the Ramp window (observed in the
// warmup-sweep: adaptive C1 missed SPIKE at W=35s,60s because sigma
// bootstrapped from the ramp itself). Always applying the EWMA
// update means after a flat warmup (sigma≈0), the first ramp
// window's abs_dev≈10 gives sigma = beta·abs_dev ≈ 3, so
// z = abs_dev/sigma = 1/beta ≈ 3.3 — strong enough to cross
// cusum_std = h=3.0 within two windows.
double abs_dev = std::fabs(deviation);
_sigma_ewma = _beta * abs_dev + (1.0 - _beta) * _sigma_ewma;
double sigma_safe = std::max(_sigma_ewma, _min_sigma);
double z = deviation / sigma_safe;
// Step 3: Standardized CUSUM — accumulate z-scores above k_std only.
_cusum_std = std::max(0.0, _cusum_std + z - _k_std);
double predicted_lambda = _ewma;
if (_cusum_std > _h_std) {
logMessage(WARNING, "[Predictor/adaptive] SPIKE DETECTED CUSUM_std=%.2f z=%.2f sigma=%.2f RPS=%d -> predicted_lambda=%.1f",
_cusum_std, z, sigma_safe, current_rps, current_rps * 1.5);
predicted_lambda = current_rps * 1.5;
_cusum_std = 0; // reset after acting on the alarm
}
int target = (int)std::ceil(predicted_lambda * _avg_service_time) + _safety_margin;
long long us = std::chrono::duration_cast<std::chrono::microseconds>(
std::chrono::high_resolution_clock::now() - t0).count();
logMessage(NORMAL, "[PredLatency] mode=ewma_adaptive us=%lld rps=%d target=%d",
us, current_rps, target);
logMessage(DEBUG, "[Predictor/adaptive] RPS=%d EWMA=%.2f sigma=%.2f z=%.2f CUSUM_std=%.2f T=%.3fs Target=%d",
current_rps, _ewma, sigma_safe, z, _cusum_std, _avg_service_time, target);
return target;
}
// E2 — Reactive baseline:
// No EWMA smoothing, no CUSUM spike detection.
// Target is computed directly from the current observed RPS via Little's Law.
// Responds only to what just happened — never pre-warms ahead of demand.
int ReactivePredict(int current_rps) {
auto t0 = std::chrono::high_resolution_clock::now();
int target = (int)std::ceil(current_rps * _avg_service_time) + _safety_margin;
long long us = std::chrono::duration_cast<std::chrono::microseconds>(
std::chrono::high_resolution_clock::now() - t0).count();
logMessage(NORMAL, "[PredLatency] mode=reactive us=%lld rps=%d target=%d",
us, current_rps, target);
logMessage(DEBUG, "[Predictor/Reactive] RPS=%d T=%.3fs Target=%d",
current_rps, _avg_service_time, target);
return target;
}
};
#endif // __PREDICTOR_HPP__