Skip to content

Commit a259dc9

Browse files
committed
monitor
1 parent 3989086 commit a259dc9

2 files changed

Lines changed: 73 additions & 0 deletions

File tree

src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ pub mod consumer;
33
pub mod notifier;
44
pub mod stream;
55
pub mod upload;
6+
pub mod monitor;
67
mod types;
78

89
/// This includes the most common types in this crate, re-exported for your convenience.

src/monitor.rs

Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
use std::sync::atomic::AtomicU64;
2+
3+
use parking_lot::Mutex;
4+
5+
#[derive(Debug)]
6+
pub struct StreamHealthMonitor {
7+
pub requests_sent: AtomicU64,
8+
pub requests_failed: AtomicU64,
9+
pub last_active: Mutex<Option<std::time::Instant>>,
10+
pub last_failure: Mutex<Option<std::time::Instant>>,
11+
pub last_success: Mutex<Option<std::time::Instant>>,
12+
}
13+
14+
impl Default for StreamHealthMonitor {
15+
fn default() -> Self {
16+
Self {
17+
requests_sent: AtomicU64::new(0),
18+
requests_failed: AtomicU64::new(0),
19+
last_active: Mutex::new(None),
20+
last_failure: Mutex::new(None),
21+
last_success: Mutex::new(None),
22+
}
23+
}
24+
}
25+
26+
impl StreamHealthMonitor {
27+
pub fn new() -> Self {
28+
Self::default()
29+
}
30+
31+
pub fn reset(&self) {
32+
self.requests_sent
33+
.store(0, std::sync::atomic::Ordering::Relaxed);
34+
self.requests_failed
35+
.store(0, std::sync::atomic::Ordering::Relaxed);
36+
*self.last_active.lock() = None;
37+
}
38+
39+
pub fn is_idle(&self, window: std::time::Duration) -> bool {
40+
let last_active = self.last_active.lock();
41+
if let Some(last) = *last_active {
42+
last.elapsed() > window
43+
} else {
44+
true
45+
}
46+
}
47+
48+
pub fn has_recent_failure(&self, window: std::time::Duration) -> bool {
49+
let last_failure = self.last_failure.lock();
50+
if let Some(last) = *last_failure {
51+
last.elapsed() <= window
52+
} else {
53+
false
54+
}
55+
}
56+
57+
pub fn record_success(&self) {
58+
self.requests_sent
59+
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
60+
*self.last_success.lock() = Some(std::time::Instant::now());
61+
*self.last_active.lock() = Some(std::time::Instant::now());
62+
}
63+
64+
pub fn record_failure(&self) {
65+
self.requests_failed
66+
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
67+
self.requests_sent
68+
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
69+
*self.last_failure.lock() = Some(std::time::Instant::now());
70+
*self.last_active.lock() = Some(std::time::Instant::now());
71+
}
72+
}

0 commit comments

Comments
 (0)