33from __future__ import annotations
44
55import hashlib
6+ import json
7+ import os
68import time
9+ from collections import deque
10+ from datetime import UTC , datetime
711from typing import TYPE_CHECKING
812
13+ import structlog
14+
915from .. import metrics
10- from ..state .redis import is_using_redis
16+ from ..state .redis import get_redis , is_using_redis
1117
1218if TYPE_CHECKING :
1319 from ..config import Settings
1420 from ..handlers import S3ProxyHandler
1521
22+ logger = structlog .get_logger (__name__ )
1623
17- def collect_key_status (settings : Settings ) -> dict :
18- """Collect encryption key status. Never exposes raw key material."""
19- return {
20- "kek_fingerprint" : hashlib .sha256 (settings .kek ).hexdigest ()[:16 ],
21- "algorithm" : "AES-256-GCM + AES-KWP" ,
22- "dek_tag_name" : settings .dektag_name ,
23- }
24+ ADMIN_KEY_PREFIX = "s3proxy:admin:"
25+ ADMIN_TTL_SECONDS = 30
2426
2527
26- async def collect_upload_status (handler : S3ProxyHandler ) -> dict :
27- """Collect active multipart upload status."""
28- uploads = await handler .multipart_manager .list_active_uploads ()
29- return {
30- "active_count" : len (uploads ),
31- "uploads" : uploads ,
32- }
28+ # ---------------------------------------------------------------------------
29+ # Rate tracker — sliding window over Prometheus counters
30+ # ---------------------------------------------------------------------------
31+
32+
33+ class RateTracker :
34+ """Tracks counter snapshots over a sliding window to compute per-minute rates."""
35+
36+ def __init__ (self , window_seconds : int = 300 ):
37+ self ._window = window_seconds
38+ self ._snapshots : deque [tuple [float , dict [str , float ]]] = deque ()
39+
40+ def record (self , counters : dict [str , float ]) -> None :
41+ now = time .monotonic ()
42+ self ._snapshots .append ((now , counters ))
43+ cutoff = now - self ._window - 10
44+ while len (self ._snapshots ) > 2 and self ._snapshots [0 ][0 ] < cutoff :
45+ self ._snapshots .popleft ()
46+
47+ def rate_per_minute (self , key : str ) -> float :
48+ if len (self ._snapshots ) < 2 :
49+ return 0.0
50+ oldest_ts , oldest_vals = self ._snapshots [0 ]
51+ newest_ts , newest_vals = self ._snapshots [- 1 ]
52+ elapsed = newest_ts - oldest_ts
53+ if elapsed < 1 :
54+ return 0.0
55+ delta = newest_vals .get (key , 0 ) - oldest_vals .get (key , 0 )
56+ return max (0.0 , delta / elapsed * 60 )
57+
58+
59+ _rate_tracker = RateTracker (window_seconds = 300 )
60+
61+
62+ # ---------------------------------------------------------------------------
63+ # Prometheus helpers
64+ # ---------------------------------------------------------------------------
3365
3466
3567def _read_gauge (gauge ) -> float :
36- """Read current value from a Prometheus Gauge."""
3768 return gauge ._value .get ()
3869
3970
4071def _read_counter (counter ) -> float :
41- """Read current value from a Prometheus Counter."""
4272 return counter ._value .get ()
4373
4474
4575def _read_labeled_counter_sum (counter ) -> float :
46- """Sum all label combinations for a labeled counter."""
4776 total = 0.0
4877 for sample in counter .collect ()[0 ].samples :
4978 if sample .name .endswith ("_total" ):
@@ -52,32 +81,66 @@ def _read_labeled_counter_sum(counter) -> float:
5281
5382
5483def _read_labeled_gauge_sum (gauge ) -> float :
55- """Sum all label combinations for a labeled gauge."""
5684 total = 0.0
5785 for sample in gauge .collect ()[0 ].samples :
5886 total += sample .value
5987 return total
6088
6189
62- def collect_system_health (start_time : float ) -> dict :
63- """Collect system health metrics."""
90+ def _read_errors_by_class () -> tuple [float , float , float ]:
91+ """Read 4xx, 5xx, 503 counts from REQUEST_COUNT labels."""
92+ errors_4xx = 0.0
93+ errors_5xx = 0.0
94+ errors_503 = 0.0
95+ for sample in metrics .REQUEST_COUNT .collect ()[0 ].samples :
96+ if not sample .name .endswith ("_total" ):
97+ continue
98+ status = str (sample .labels .get ("status" , "" ))
99+ if status .startswith ("4" ):
100+ errors_4xx += sample .value
101+ elif status == "503" :
102+ errors_503 += sample .value
103+ errors_5xx += sample .value
104+ elif status .startswith ("5" ):
105+ errors_5xx += sample .value
106+ return errors_4xx , errors_5xx , errors_503
107+
108+
109+ # ---------------------------------------------------------------------------
110+ # Collectors
111+ # ---------------------------------------------------------------------------
112+
113+
114+ def collect_pod_identity (settings : Settings , start_time : float ) -> dict :
115+ """Collect pod identity for the header banner."""
116+ return {
117+ "pod_name" : os .environ .get ("HOSTNAME" , "unknown" ),
118+ "uptime_seconds" : int (time .monotonic () - start_time ),
119+ "storage_backend" : "Redis (HA)" if is_using_redis () else "In-memory" ,
120+ "kek_fingerprint" : hashlib .sha256 (settings .kek ).hexdigest ()[:16 ],
121+ }
122+
123+
124+ def collect_health () -> dict :
125+ """Collect health metrics with error counts."""
64126 memory_reserved = _read_gauge (metrics .MEMORY_RESERVED_BYTES )
65127 memory_limit = _read_gauge (metrics .MEMORY_LIMIT_BYTES )
66128 usage_pct = round (memory_reserved / memory_limit * 100 , 1 ) if memory_limit > 0 else 0
129+ errors_4xx , errors_5xx , errors_503 = _read_errors_by_class ()
67130
68131 return {
69132 "memory_reserved_bytes" : int (memory_reserved ),
70133 "memory_limit_bytes" : int (memory_limit ),
71134 "memory_usage_pct" : usage_pct ,
72135 "requests_in_flight" : int (_read_labeled_gauge_sum (metrics .REQUESTS_IN_FLIGHT )),
73- "memory_rejections " : int (_read_counter ( metrics . MEMORY_REJECTIONS ) ),
74- "uptime_seconds " : int (time . monotonic () - start_time ),
75- "storage_backend " : ( "Redis (HA)" if is_using_redis () else "In-memory" ),
136+ "errors_4xx " : int (errors_4xx ),
137+ "errors_5xx " : int (errors_5xx ),
138+ "errors_503 " : int ( errors_503 ),
76139 }
77140
78141
79- def collect_request_stats () -> dict :
80- """Collect request statistics ."""
142+ def collect_throughput () -> dict :
143+ """Collect throughput counters and compute per-minute rates ."""
81144 encrypt_ops = 0.0
82145 decrypt_ops = 0.0
83146 for sample in metrics .ENCRYPTION_OPERATIONS .collect ()[0 ].samples :
@@ -87,15 +150,95 @@ def collect_request_stats() -> dict:
87150 elif sample .labels .get ("operation" ) == "decrypt" :
88151 decrypt_ops = sample .value
89152
153+ total_requests = _read_labeled_counter_sum (metrics .REQUEST_COUNT )
154+ bytes_encrypted = _read_counter (metrics .BYTES_ENCRYPTED )
155+ bytes_decrypted = _read_counter (metrics .BYTES_DECRYPTED )
156+ errors_4xx , errors_5xx , errors_503 = _read_errors_by_class ()
157+
158+ counters = {
159+ "requests" : total_requests ,
160+ "encrypt_ops" : encrypt_ops ,
161+ "decrypt_ops" : decrypt_ops ,
162+ "bytes_encrypted" : bytes_encrypted ,
163+ "bytes_decrypted" : bytes_decrypted ,
164+ "errors_4xx" : errors_4xx ,
165+ "errors_5xx" : errors_5xx ,
166+ "errors_503" : errors_503 ,
167+ }
168+ _rate_tracker .record (counters )
169+
90170 return {
91- "total_requests" : int (_read_labeled_counter_sum (metrics .REQUEST_COUNT )),
92- "encrypt_ops" : int (encrypt_ops ),
93- "decrypt_ops" : int (decrypt_ops ),
94- "bytes_encrypted" : int (_read_counter (metrics .BYTES_ENCRYPTED )),
95- "bytes_decrypted" : int (_read_counter (metrics .BYTES_DECRYPTED )),
171+ "rates" : {
172+ "requests_per_min" : round (_rate_tracker .rate_per_minute ("requests" ), 1 ),
173+ "encrypt_per_min" : round (_rate_tracker .rate_per_minute ("encrypt_ops" ), 1 ),
174+ "decrypt_per_min" : round (_rate_tracker .rate_per_minute ("decrypt_ops" ), 1 ),
175+ "bytes_encrypted_per_min" : int (_rate_tracker .rate_per_minute ("bytes_encrypted" )),
176+ "bytes_decrypted_per_min" : int (_rate_tracker .rate_per_minute ("bytes_decrypted" )),
177+ "errors_4xx_per_min" : round (_rate_tracker .rate_per_minute ("errors_4xx" ), 1 ),
178+ "errors_5xx_per_min" : round (_rate_tracker .rate_per_minute ("errors_5xx" ), 1 ),
179+ "errors_503_per_min" : round (_rate_tracker .rate_per_minute ("errors_503" ), 1 ),
180+ },
96181 }
97182
98183
184+ async def collect_upload_status (handler : S3ProxyHandler ) -> dict :
185+ """Collect active multipart upload status with stale detection."""
186+ uploads = await handler .multipart_manager .list_active_uploads ()
187+ now = datetime .now (UTC )
188+ for upload in uploads :
189+ created = datetime .fromisoformat (upload ["created_at" ])
190+ if created .tzinfo is None :
191+ created = created .replace (tzinfo = UTC )
192+ age_seconds = (now - created ).total_seconds ()
193+ upload ["is_stale" ] = age_seconds > 1800
194+ upload ["total_plaintext_size_formatted" ] = _format_bytes (upload ["total_plaintext_size" ])
195+ return {
196+ "active_count" : len (uploads ),
197+ "uploads" : uploads ,
198+ }
199+
200+
201+ # ---------------------------------------------------------------------------
202+ # Redis pod metrics publishing (multi-pod view)
203+ # ---------------------------------------------------------------------------
204+
205+
206+ async def publish_pod_metrics (pod_data : dict ) -> None :
207+ """Publish this pod's metrics to Redis so other pods can read them."""
208+ if not is_using_redis ():
209+ return
210+ try :
211+ client = get_redis ()
212+ pod_name = pod_data ["pod" ]["pod_name" ]
213+ key = f"{ ADMIN_KEY_PREFIX } { pod_name } "
214+ await client .set (key , json .dumps (pod_data ).encode (), ex = ADMIN_TTL_SECONDS )
215+ except Exception :
216+ logger .debug ("Failed to publish pod metrics to Redis" , exc_info = True )
217+
218+
219+ async def read_all_pod_metrics () -> list [dict ]:
220+ """Read all pods' metrics from Redis. Returns empty list if not using Redis."""
221+ if not is_using_redis ():
222+ return []
223+ try :
224+ client = get_redis ()
225+ pods = []
226+ async for key in client .scan_iter (match = f"{ ADMIN_KEY_PREFIX } *" , count = 100 ):
227+ data = await client .get (key )
228+ if data :
229+ pods .append (json .loads (data ))
230+ pods .sort (key = lambda p : p .get ("pod" , {}).get ("pod_name" , "" ))
231+ return pods
232+ except Exception :
233+ logger .debug ("Failed to read pod metrics from Redis" , exc_info = True )
234+ return []
235+
236+
237+ # ---------------------------------------------------------------------------
238+ # Formatters
239+ # ---------------------------------------------------------------------------
240+
241+
99242def _format_bytes (n : int ) -> str :
100243 """Format bytes to human-readable string."""
101244 for unit in ("B" , "KB" , "MB" , "GB" , "TB" ):
@@ -119,25 +262,47 @@ def _format_uptime(seconds: int) -> str:
119262 return " " .join (parts )
120263
121264
265+ # ---------------------------------------------------------------------------
266+ # Aggregate
267+ # ---------------------------------------------------------------------------
268+
269+
122270async def collect_all (
123271 settings : Settings ,
124272 handler : S3ProxyHandler ,
125273 start_time : float ,
126274) -> dict :
127- """Collect all dashboard data."""
275+ """Collect all dashboard data and publish to Redis for multi-pod view."""
276+ pod = collect_pod_identity (settings , start_time )
277+ health = collect_health ()
278+ throughput = collect_throughput ()
128279 upload_status = await collect_upload_status (handler )
129- health = collect_system_health (start_time )
130- stats = collect_request_stats ()
131- return {
132- "key_status" : collect_key_status (settings ),
133- "upload_status" : upload_status ,
134- "system_health" : health ,
135- "request_stats" : stats ,
280+
281+ local_data = {
282+ "pod" : pod ,
283+ "health" : health ,
284+ "throughput" : throughput ,
136285 "formatted" : {
137286 "memory_reserved" : _format_bytes (health ["memory_reserved_bytes" ]),
138287 "memory_limit" : _format_bytes (health ["memory_limit_bytes" ]),
139- "uptime" : _format_uptime (health ["uptime_seconds" ]),
140- "bytes_encrypted" : _format_bytes (stats ["bytes_encrypted" ]),
141- "bytes_decrypted" : _format_bytes (stats ["bytes_decrypted" ]),
288+ "uptime" : _format_uptime (pod ["uptime_seconds" ]),
289+ "bytes_encrypted_per_min" : _format_bytes (
290+ throughput ["rates" ]["bytes_encrypted_per_min" ]
291+ ),
292+ "bytes_decrypted_per_min" : _format_bytes (
293+ throughput ["rates" ]["bytes_decrypted_per_min" ]
294+ ),
142295 },
143296 }
297+
298+ # Publish this pod's data to Redis (fire-and-forget for other pods to see)
299+ await publish_pod_metrics (local_data )
300+
301+ # Read all pods from Redis (includes this pod's just-published data)
302+ all_pods = await read_all_pod_metrics ()
303+
304+ return {
305+ ** local_data ,
306+ "uploads" : upload_status ,
307+ "all_pods" : all_pods ,
308+ }
0 commit comments