Skip to content

Commit 09cbbef

Browse files
Track timeouts in seconds
1 parent c3ed4e0 commit 09cbbef

4 files changed

Lines changed: 19 additions & 26 deletions

File tree

src/main.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -102,7 +102,7 @@ fn main() -> io::Result<()> {
102102

103103
// Global application state
104104
let locked = Arc::new(AtomicBool::new(false));
105-
let last_seen_ns = Arc::new(AtomicU64::new(0));
105+
let last_seen_s = Arc::new(AtomicU64::new(0));
106106

107107
let stats = Arc::new(Stats::new());
108108
let exit_code_set = Arc::new(AtomicU32::new(0));
@@ -136,7 +136,7 @@ fn main() -> io::Result<()> {
136136
let sock_mgrs_a = sock_mgrs.clone();
137137
let worker_id = worker_base;
138138
let locked_a = Arc::clone(&locked);
139-
let last_seen_a = Arc::clone(&last_seen_ns);
139+
let last_seen_a = Arc::clone(&last_seen_s);
140140
let stats_a = Arc::clone(&stats);
141141

142142
thread::spawn(move || {
@@ -159,7 +159,7 @@ fn main() -> io::Result<()> {
159159
let sock_mgr_b = Arc::clone(&sock_mgr);
160160
let worker_id = worker_base + 1;
161161
let locked_b = Arc::clone(&locked);
162-
let last_seen_b = Arc::clone(&last_seen_ns);
162+
let last_seen_b = Arc::clone(&last_seen_s);
163163
let stats_b = Arc::clone(&stats);
164164

165165
thread::spawn(move || {
@@ -181,7 +181,7 @@ fn main() -> io::Result<()> {
181181
let cfg_w = Arc::clone(&cfg);
182182
let sock_mgrs_w = sock_mgrs.clone();
183183
let locked_w = Arc::clone(&locked);
184-
let last_seen_w = Arc::clone(&last_seen_ns);
184+
let last_seen_w = Arc::clone(&last_seen_s);
185185
let exit_code_set_w = Arc::clone(&exit_code_set);
186186

187187
thread::spawn(move || {

src/net/payload.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -163,7 +163,7 @@ pub(crate) fn handle_payload_result(
163163
t_recv: Instant,
164164
cfg: &Config,
165165
stats: &Stats,
166-
last_seen_ns: &AtomicU64,
166+
last_seen_s: &AtomicU64,
167167
validated: &ValidatedPayload<'_>,
168168
send_res: &io::Result<bool>,
169169
sock_connected: bool,
@@ -172,7 +172,8 @@ pub(crate) fn handle_payload_result(
172172
) {
173173
match send_res {
174174
Ok(res) => {
175-
last_seen_ns.store(Stats::dur_ns(t_start, t_recv), AtomOrdering::Relaxed);
175+
let last_seen = t_recv.saturating_duration_since(t_start).as_secs();
176+
last_seen_s.store(last_seen, AtomOrdering::Relaxed);
176177
if cfg.stats_interval_mins != 0 {
177178
let t_send = Instant::now();
178179
stats.send_add(c2u, validated.len as u64, t_recv, t_send);

src/stats.rs

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -64,14 +64,9 @@ impl Stats {
6464
}
6565
}
6666

67-
#[inline]
68-
pub fn dur_ns(start: Instant, end: Instant) -> u64 {
69-
end.saturating_duration_since(start).as_nanos() as u64
70-
}
71-
7267
#[inline]
7368
pub fn send_add(&self, c2u: bool, bytes: u64, start: Instant, end: Instant) {
74-
let lat_ns = Self::dur_ns(start, end);
69+
let lat_ns = end.saturating_duration_since(start).as_nanos() as u64;
7570
let (atom_pkts, atom_bytes, atom_lat_sum_ns, atom_lat_max_ns, atom_bytes_max) = if c2u {
7671
(
7772
&self.agg.c2u_pkts,

src/worker.rs

Lines changed: 11 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -148,21 +148,18 @@ pub fn run_watchdog_thread(
148148
cfg: &Config,
149149
sock_mgrs: &[Arc<SocketManager>],
150150
locked: &AtomicBool,
151-
last_seen_ns: &AtomicU64,
151+
last_seen_s: &AtomicU64,
152152
exit_code_set: &AtomicU32,
153153
) {
154-
let timeout_ns = Duration::from_secs(cfg.timeout_secs)
155-
.as_nanos()
156-
.min(u128::from(u64::MAX)) as u64;
157154
let period = Duration::from_secs(1);
158155
loop {
159156
thread::sleep(period);
160157
if locked.load(AtomOrdering::Relaxed) {
161158
let now = Instant::now();
162-
let now_ns = Stats::dur_ns(t_start, now);
163-
let last_ns = last_seen_ns.load(AtomOrdering::Relaxed);
159+
let now_s = now.saturating_duration_since(t_start).as_secs();
160+
let last_s = last_seen_s.load(AtomOrdering::Relaxed);
164161

165-
if last_ns != 0 && now_ns.saturating_sub(last_ns) >= timeout_ns {
162+
if last_s != 0 && now_s.saturating_sub(last_s) >= cfg.timeout_secs {
166163
match cfg.on_timeout {
167164
TimeoutAction::Drop => {
168165
log_warn!(
@@ -191,7 +188,7 @@ pub fn run_watchdog_thread(
191188
}
192189
}
193190
locked.store(false, AtomOrdering::Relaxed);
194-
last_seen_ns.store(0, AtomOrdering::Relaxed);
191+
last_seen_s.store(0, AtomOrdering::Relaxed);
195192
}
196193
_ => {
197194
log_warn!(
@@ -213,7 +210,7 @@ pub fn run_upstream_to_client_thread(
213210
sock_mgr: &SocketManager,
214211
worker_id: usize,
215212
locked: &AtomicBool,
216-
last_seen_ns: &AtomicU64,
213+
last_seen_s: &AtomicU64,
217214
stats: &Stats,
218215
) {
219216
const C2U: bool = false;
@@ -260,7 +257,7 @@ pub fn run_upstream_to_client_thread(
260257
t_recv,
261258
cfg,
262259
stats,
263-
last_seen_ns,
260+
last_seen_s,
264261
&validated,
265262
&send_res,
266263
handles.client_connected,
@@ -288,7 +285,7 @@ pub fn run_client_to_upstream_thread(
288285
all_sock_mgrs: &[Arc<SocketManager>],
289286
worker_id: usize,
290287
locked: &AtomicBool,
291-
last_seen_ns: &AtomicU64,
288+
last_seen_s: &AtomicU64,
292289
stats: &Stats,
293290
) {
294291
const C2U: bool = true;
@@ -336,7 +333,7 @@ pub fn run_client_to_upstream_thread(
336333
t_recv,
337334
cfg,
338335
stats,
339-
last_seen_ns,
336+
last_seen_s,
340337
&validated,
341338
&send_res,
342339
handles.upstream_connected,
@@ -452,7 +449,7 @@ pub fn run_client_to_upstream_thread(
452449
t_recv,
453450
cfg,
454451
stats,
455-
last_seen_ns,
452+
last_seen_s,
456453
&validated,
457454
&send_res,
458455
handles.upstream_connected,
@@ -485,7 +482,7 @@ pub fn run_client_to_upstream_thread(
485482
t_recv,
486483
cfg,
487484
stats,
488-
last_seen_ns,
485+
last_seen_s,
489486
&validated,
490487
&send_res,
491488
handles.upstream_connected,

0 commit comments

Comments
 (0)