-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathmain.py
More file actions
2947 lines (2546 loc) · 113 KB
/
main.py
File metadata and controls
2947 lines (2546 loc) · 113 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
"""
SpaceWatch – AI-Driven Observability Backend (Production Baseline, copy/paste)
Key goals this file satisfies:
- AI is the “driver”: user can ask anything; AI decides what tools to call.
- Backend sits between AI agent <-> DigitalOcean Spaces.
- Tools include: buckets, list objects, recent objects, storage summary, top largest,
metrics snapshots/series/sources, list/read/search .log.gz access logs, object audit timeline.
- Backend NEVER returns tool-call JSON to the frontend; it executes tool calls internally.
- Robust parsing: extracts the FIRST JSON object from agent output, even if the model adds text.
- Enforces: ONE tool call per agent turn. If model returns multiple JSON objects, backend requests a retry.
- Safe limits: rate limiting, max tool steps, max bytes, max files/lines scanned.
How “last file uploaded” + “from which IP” works with this file:
1) AI calls recent_objects(bucket=...)
2) AI picks the most recent object key
3) AI calls object_audit(source_bucket=..., object_key=..., methods=["PUT"])
4) AI answers with the IP + timestamp
IMPORTANT: This app can only answer questions from:
- Spaces object listing (sizes/last_modified)
- Access logs you store in ACCESS_LOGS_BUCKET (supports .log and .log.gz)
- Snapshots you write (every 5 min) into METRICS_BUCKET/METRICS_PREFIX
It cannot magically provide AWS CloudWatch / Azure Monitor without additional integrations.
"""
import os
import re
import time
from collections import defaultdict
from typing import Optional
import json
import gzip
import io
import sys
import time
import traceback
import asyncio
import hashlib
from dataclasses import dataclass
from datetime import datetime, timezone, timedelta
from typing import Optional, List, Dict, Any, Tuple, Set, Callable
import httpx
import boto3
from botocore.client import Config
from botocore.exceptions import ClientError
from dotenv import load_dotenv
from fastapi import FastAPI, HTTPException, Header, Request
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import JSONResponse, FileResponse, Response
from fastapi.staticfiles import StaticFiles
from pydantic import BaseModel, SecretStr
from collections import Counter, deque
import matplotlib
matplotlib.use("Agg")
import matplotlib.pyplot as plt
import logging
# Load environment variables from .env file
load_dotenv()
# ============================================================
# STRUCTURED LOGGING (AWS S3 / Azure Blob style)
# ============================================================
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s [%(levelname)s] %(name)s - %(message)s',
datefmt='%Y-%m-%d %H:%M:%S'
)
logger = logging.getLogger("spacewatch")
# ============================================================
# ENV / CONFIG
# ============================================================
# Default region and endpoint - can be overridden per request
DEFAULT_SPACES_REGION = os.getenv("SPACES_REGION", "sgp1")
DEFAULT_SPACES_ENDPOINT = os.getenv("SPACES_ENDPOINT", f"https://{DEFAULT_SPACES_REGION}.digitaloceanspaces.com")
DO_AGENT_URL = os.getenv("DO_AGENT_URL") # must be OpenAI-compatible chat completions endpoint
DO_AGENT_KEY = os.getenv("DO_AGENT_KEY")
# NOTE: ACCESS_LOGS_BUCKET and METRICS_BUCKET are no longer global defaults.
# Users must specify log_bucket and log_prefix per request.
METRICS_GZIP = os.getenv("METRICS_GZIP", "true").lower() in {"1", "true", "yes"}
# Optional: explicitly tell scheduler which source buckets to snapshot.
# If set, scheduler does NOT need list_buckets permission.
# Example: SCHEDULER_SOURCE_BUCKETS="prod-assets,prod-uploads"
SCHEDULER_SOURCE_BUCKETS = [b.strip() for b in os.getenv("SCHEDULER_SOURCE_BUCKETS", "").split(",") if b.strip()]
# Optional: protect endpoints with X-API-Key
APP_API_KEY = os.getenv("APP_API_KEY", "").strip()
# Optional fallback buckets if list_buckets() is disallowed
FALLBACK_BUCKETS = [b.strip() for b in os.getenv("FALLBACK_BUCKETS", "").split(",") if b.strip()]
# Safety limits
MAX_LOG_BYTES = int(os.getenv("MAX_LOG_BYTES", "10485760")) # 10MB per object read
MAX_LOG_LINES = int(os.getenv("MAX_LOG_LINES", "300")) # tail lines on read_log
MAX_LOG_FILES_SCAN = int(os.getenv("MAX_LOG_FILES_SCAN", "50"))
MAX_LOG_LINES_SCAN = int(os.getenv("MAX_LOG_LINES_SCAN", "50000"))
MAX_LIST_OBJECTS_RETURN = int(os.getenv("MAX_LIST_OBJECTS_RETURN", "200"))
MAX_TOP_LARGEST = int(os.getenv("MAX_TOP_LARGEST", "50"))
# Rate limiting
RATE_LIMIT_RPS = float(os.getenv("RATE_LIMIT_RPS", "2.0"))
RATE_LIMIT_BURST = int(os.getenv("RATE_LIMIT_BURST", "10"))
# Bucket cache
BUCKET_CACHE_TTL_SEC = int(os.getenv("BUCKET_CACHE_TTL_SEC", "300"))
# Agent call config
DO_AGENT_TIMEOUT_SEC = float(os.getenv("DO_AGENT_TIMEOUT_SEC", "60"))
DO_AGENT_RETRIES = int(os.getenv("DO_AGENT_RETRIES", "2"))
# Agent tool loop
AGENT_MAX_STEPS = int(os.getenv("AGENT_MAX_STEPS", "8"))
AGENT_MAX_TOOL_BYTES = int(os.getenv("AGENT_MAX_TOOL_BYTES", "250000"))
# Optional scheduler to write snapshots every 5 minutes
ENABLE_SCHEDULER = os.getenv("ENABLE_SCHEDULER", "false").lower() in {"1", "true", "yes"}
SNAPSHOT_EVERY_SEC = int(os.getenv("SNAPSHOT_EVERY_SEC", "300"))
# Optional leader gating if you run multiple replicas
SCHEDULER_INSTANCE_ID = os.getenv("SCHEDULER_INSTANCE_ID", "").strip()
SCHEDULER_LEADER_ID = os.getenv("SCHEDULER_LEADER_ID", "").strip()
if not all([DO_AGENT_URL, DO_AGENT_KEY]):
print("\n" + "=" * 70)
print("❌ CONFIGURATION ERROR")
print("=" * 70)
print("\nMissing required environment variables: DO_AGENT_URL, DO_AGENT_KEY")
print("\nTo configure SpaceWatch, please run the interactive setup:")
print(" python setup.py")
print("\nOr manually create a .env file with the required variables.")
print("See sample.env for an example configuration.")
print("=" * 70 + "\n")
raise RuntimeError("Missing required configuration")
# ============================================================
# DYNAMIC S3 CLIENT CREATION
# ============================================================
def create_s3_client(access_key: str, secret_key: str, region: Optional[str] = None, endpoint: Optional[str] = None):
"""
Create an S3 client dynamically with provided user credentials.
This allows multi-tenant usage where each request uses its own credentials.
"""
if not access_key or not secret_key:
raise HTTPException(status_code=400, detail="Spaces access key and secret key are required")
region = region or DEFAULT_SPACES_REGION
endpoint = endpoint or DEFAULT_SPACES_ENDPOINT
return boto3.client(
"s3",
region_name=region,
endpoint_url=endpoint,
aws_access_key_id=access_key,
aws_secret_access_key=secret_key,
config=Config(signature_version="s3v4"),
)
def get_credential_cache_key(access_key: str, region: Optional[str] = None, endpoint: Optional[str] = None) -> str:
"""
Generate a stable cache key for credential-scoped caching in BYOC scenarios.
Hash the tuple (access_key_id, endpoint_url, region) to create a unique identifier
for each set of credentials. This ensures that caches (buckets, IPs, metrics) are
isolated per credential set, preventing cross-user data leakage.
Note: We do NOT include the secret key in the cache key for security reasons.
The access key alone is sufficient to identify unique credential sets.
"""
region = region or DEFAULT_SPACES_REGION
endpoint = endpoint or DEFAULT_SPACES_ENDPOINT
# Create a stable string representation of the credential tuple
credential_tuple = f"{access_key}:{endpoint}:{region}"
# Hash it to create a stable, privacy-preserving cache key
return hashlib.sha256(credential_tuple.encode()).hexdigest()[:16]
# ============================================================
# APP
# ============================================================
app = FastAPI(title="SpaceWatch – AI-driven Observability Backend (Prod Baseline)")
app.add_middleware(
CORSMiddleware,
allow_origins=["*"], # tighten in production
allow_methods=["*"],
allow_headers=["*"],
)
# ============================================================
# STORAGE OPERATION TRACKING MIDDLEWARE (S3/Azure Blob style)
# ============================================================
@app.middleware("http")
async def track_storage_operations(request: Request, call_next):
"""Middleware to track storage operation latency and performance"""
start_time = time.time()
# Determine operation type based on endpoint
operation_type = "UNKNOWN"
bucket = None
path = request.url.path
if path == "/" or path == "/config" or path.endswith("/config.html"):
operation_type = "CONFIG_PAGE"
elif path == "/dashboard" or path.endswith("/index.html"):
operation_type = "PAGE_LOAD"
elif "/static/" in path:
operation_type = "STATIC_ASSET"
elif "/health" in path:
operation_type = "HEALTH_CHECK"
elif "/stats" in path:
operation_type = "STATS"
elif "/tools/buckets" in path or "/metrics/sources" in path:
operation_type = "LIST_BUCKETS"
elif "/tools/list-all" in path or "/tools/storage-summary" in path:
operation_type = "LIST_OBJECTS"
bucket = request.query_params.get("bucket")
elif "/tools/top-largest" in path:
operation_type = "ANALYZE_STORAGE"
bucket = request.query_params.get("bucket")
elif "/chat" in path:
operation_type = "AI_QUERY"
elif "recent_objects" in path or path.startswith("/metrics/"):
operation_type = "QUERY_METRICS"
elif "/logs" in path or "access_logs" in path or "search_logs" in path:
operation_type = "QUERY_LOGS"
elif "/plots/" in path:
operation_type = "GENERATE_PLOT"
try:
response = await call_next(request)
duration_ms = (time.time() - start_time) * 1000
# Estimate bytes transferred from Content-Length header
bytes_transferred = 0
if "content-length" in response.headers:
try:
bytes_transferred = int(response.headers["content-length"])
except:
pass
# Record storage operation metrics
record_storage_operation(
endpoint=path,
method=request.method,
operation_type=operation_type,
duration_ms=duration_ms,
status_code=response.status_code,
bucket=bucket,
bytes_transferred=bytes_transferred
)
# Record Mission Control metrics
record_mission_control_request(
timestamp=start_time,
method=request.method,
path=path,
status_code=response.status_code,
duration_ms=duration_ms,
client_ip=request.client.host if request.client else "unknown"
)
# Add custom headers with metrics (like S3 request metrics)
response.headers["X-Request-Duration-Ms"] = str(round(duration_ms, 2))
response.headers["X-Request-Id"] = request.headers.get("X-Request-Id", f"req-{int(time.time() * 1000)}")
response.headers["X-Operation-Type"] = operation_type
# Structured logging for storage operations
logger.info(
f"STORAGE_OP: {operation_type} | {request.method} {path} | "
f"status={response.status_code} | duration={duration_ms:.2f}ms | "
f"bytes={bytes_transferred} | bucket={bucket or 'N/A'}"
)
return response
except Exception as e:
duration_ms = (time.time() - start_time) * 1000
logger.error(
f"STORAGE_OP_ERROR: {operation_type} | {request.method} {path} | "
f"duration={duration_ms:.2f}ms | error={str(e)}"
)
raise
if os.path.isdir("static"):
app.mount("/static", StaticFiles(directory="static"), name="static")
@app.get("/")
def home():
"""Serve the configuration page as the landing page"""
if os.path.exists("static/config.html"):
return FileResponse("static/config.html")
return {"ok": True, "message": "SpaceWatch backend running"}
@app.get("/dashboard")
def dashboard():
"""Serve the main dashboard after configuration"""
if os.path.exists("static/index.html"):
return FileResponse("static/index.html")
return {"ok": True, "message": "SpaceWatch dashboard not found"}
@app.exception_handler(Exception)
async def global_exception_handler(request: Request, exc: Exception):
traceback.print_exc()
return JSONResponse(status_code=500, content={"detail": f"Internal Server Error: {type(exc).__name__}: {str(exc)}"})
# ============================================================
# MODELS
# ============================================================
class ChatRequest(BaseModel):
message: str
spaces_key: SecretStr
spaces_secret: SecretStr
log_bucket: Optional[str] = None # User's bucket for access logs
log_prefix: Optional[str] = "" # Optional prefix for access logs
metrics_bucket: Optional[str] = None # User's bucket for metrics
metrics_prefix: Optional[str] = "spacewatch-metrics/"
region: Optional[str] = None
endpoint: Optional[str] = None
# ============================================================
# RATE LIMIT + MEMORY (in-memory)
# ============================================================
@dataclass
class TokenBucket:
tokens: float
last_ts: float
@dataclass
class MemoryState:
last_bucket: Optional[str]
last_tool: Optional[str]
last_updated_ts: float
BUCKET_BY_IP: Dict[str, TokenBucket] = {}
MEMORY_BY_KEY: Dict[str, MemoryState] = {}
STATS = {
"chat_requests": 0,
"tool_requests": 0,
"agent_calls": 0,
"agent_failures": 0,
"bucket_cache_refreshes": 0,
"scheduler_runs": 0,
"scheduler_snapshots_ok": 0,
"scheduler_snapshots_err": 0,
}
# ============================================================
# ENHANCED STORAGE METRICS (S3/Azure Blob style)
# ============================================================
@dataclass
class StorageOperationMetrics:
"""Track storage operation latency and performance metrics"""
start_time: float
endpoint: str
method: str
operation_type: str # GET, PUT, DELETE, HEAD, LIST
bucket: Optional[str] = None
status_code: int = 0
duration_ms: float = 0.0
bytes_transferred: int = 0
# Metrics storage for storage operation tracking
STORAGE_OPERATION_METRICS: List[StorageOperationMetrics] = []
MAX_STORAGE_METRICS_SAMPLES = 10000 # Keep last 10k storage operations
MAX_LATENCY_SAMPLES = 10000 # Keep last 10k requests for latency tracking
def record_storage_operation(
endpoint: str,
method: str,
operation_type: str,
duration_ms: float,
status_code: int,
bucket: Optional[str] = None,
bytes_transferred: int = 0
):
"""Record storage operation metrics for analytics"""
metric = StorageOperationMetrics(
start_time=time.time(),
endpoint=endpoint,
method=method,
operation_type=operation_type,
bucket=bucket,
status_code=status_code,
duration_ms=duration_ms,
bytes_transferred=bytes_transferred
)
STORAGE_OPERATION_METRICS.append(metric)
# Keep only recent samples
if len(STORAGE_OPERATION_METRICS) > MAX_STORAGE_METRICS_SAMPLES:
STORAGE_OPERATION_METRICS.pop(0)
# Log slow operations (like S3 slow request logs)
if duration_ms > 5000: # > 5 seconds
logger.warning(
f"SLOW_OPERATION: {operation_type} {endpoint} - {duration_ms:.2f}ms - "
f"bucket={bucket} status={status_code}"
)
def calculate_percentiles(values: List[float], percentiles: List[int] = [50, 95, 99]) -> Dict[str, float]:
"""Calculate percentiles for latency metrics"""
if not values:
return {f"p{p}": 0.0 for p in percentiles}
sorted_vals = sorted(values)
result = {}
for p in percentiles:
idx = int(len(sorted_vals) * p / 100)
if idx >= len(sorted_vals):
idx = len(sorted_vals) - 1
result[f"p{p}"] = sorted_vals[idx]
return result
def get_storage_metrics() -> Dict[str, Any]:
"""Get storage operation metrics (S3/Azure Blob style analytics)"""
now = time.time()
recent_window = 300 # 5 minutes
recent_ops = [op for op in STORAGE_OPERATION_METRICS if now - op.start_time <= recent_window]
total_ops = len(recent_ops)
error_ops = len([op for op in recent_ops if op.status_code >= 400])
# Break down by operation type
ops_by_type = defaultdict(lambda: {"count": 0, "errors": 0, "latencies": [], "bytes": 0})
for op in recent_ops:
stats = ops_by_type[op.operation_type]
stats["count"] += 1
if op.status_code >= 400:
stats["errors"] += 1
stats["latencies"].append(op.duration_ms)
stats["bytes"] += op.bytes_transferred
# Calculate overall metrics
all_latencies = [op.duration_ms for op in recent_ops]
percentiles = calculate_percentiles(all_latencies)
# Operation breakdown
operation_breakdown = {}
for op_type, stats in ops_by_type.items():
operation_breakdown[op_type] = {
"request_count": stats["count"],
"error_count": stats["errors"],
"error_rate_percent": round(stats["errors"] / stats["count"] * 100, 2) if stats["count"] > 0 else 0,
"bytes_transferred": stats["bytes"],
**calculate_percentiles(stats["latencies"])
}
# Calculate operation rate
ops_per_second = total_ops / recent_window if recent_window > 0 else 0
error_rate = (error_ops / total_ops * 100) if total_ops > 0 else 0
# Calculate data transfer metrics
total_bytes_transferred = sum(op.bytes_transferred for op in recent_ops)
bytes_per_second = total_bytes_transferred / recent_window if recent_window > 0 else 0
return {
"total_operations_5m": total_ops,
"error_operations_5m": error_ops,
"error_rate_percent": round(error_rate, 2),
"operations_per_second": round(ops_per_second, 2),
"total_bytes_transferred_5m": total_bytes_transferred,
"bytes_per_second": round(bytes_per_second, 2),
"latency_p50_ms": round(percentiles.get("p50", 0), 2),
"latency_p95_ms": round(percentiles.get("p95", 0), 2),
"latency_p99_ms": round(percentiles.get("p99", 0), 2),
"avg_latency_ms": round(sum(all_latencies) / len(all_latencies), 2) if all_latencies else 0,
"operation_breakdown": operation_breakdown,
"timestamp": datetime.now(timezone.utc).isoformat()
}
# ============================================================
# MISSION CONTROL METRICS (Admin Dashboard)
# ============================================================
@dataclass
class RequestSample:
"""Individual request sample for Mission Control"""
timestamp: float
method: str
route: str # normalized route
status_code: int
duration_ms: float
client_ip: str
@dataclass
class MinuteBucket:
"""Per-minute aggregated metrics"""
requests: int = 0
errors_4xx: int = 0
errors_5xx: int = 0
duration_samples_ms: Any = None # deque
per_route_counts: Any = None # Counter
per_route_duration_samples: Any = None # dict of route -> deque
def __post_init__(self):
if self.duration_samples_ms is None:
self.duration_samples_ms = deque(maxlen=2000)
if self.per_route_counts is None:
self.per_route_counts = Counter()
if self.per_route_duration_samples is None:
self.per_route_duration_samples = {}
@dataclass
class MissionControlEvent:
"""Event for health status changes"""
ts: float
severity: str # info, warning, high, disaster
title: str
details: str
metric_before: Optional[float] = None
metric_after: Optional[float] = None
# Metrics storage for Mission Control
REQUEST_SAMPLES = deque(maxlen=20000)
MINUTE_BUCKETS: Dict[int, MinuteBucket] = {} # keyed by minute_ts (epoch // 60 * 60)
EVENTS = deque(maxlen=200)
# Snapshot for detecting changes
_LAST_SNAPSHOT = {
"timestamp": time.time(),
"req_per_min_5m": 0.0,
"p95_latency_5m": 0.0,
"error_5xx_rate_5m": 0.0,
"req_per_min_60m": 0.0,
}
# Server start time for uptime
SERVER_START_TIME = time.time()
def normalize_route(path: str) -> str:
"""Normalize route for grouping (remove IDs, etc.)"""
# Replace numeric IDs and hashes with placeholders
path = re.sub(r'/\d+', '/{id}', path)
path = re.sub(r'/[a-f0-9]{8,}', '/{hash}', path)
# Normalize admin routes
if path.startswith('/admin/api/'):
return path
return path
def record_mission_control_request(
timestamp: float,
method: str,
path: str,
status_code: int,
duration_ms: float,
client_ip: str
):
"""Record request for Mission Control monitoring"""
# Skip admin routes from monitoring themselves
if path.startswith('/admin/'):
return
route = normalize_route(path)
# Add to request samples
sample = RequestSample(
timestamp=timestamp,
method=method,
route=route,
status_code=status_code,
duration_ms=duration_ms,
client_ip=client_ip
)
REQUEST_SAMPLES.append(sample)
# Add to minute bucket
minute_ts = int(timestamp // 60 * 60)
if minute_ts not in MINUTE_BUCKETS:
MINUTE_BUCKETS[minute_ts] = MinuteBucket()
bucket = MINUTE_BUCKETS[minute_ts]
bucket.requests += 1
if 400 <= status_code < 500:
bucket.errors_4xx += 1
elif status_code >= 500:
bucket.errors_5xx += 1
bucket.duration_samples_ms.append(duration_ms)
bucket.per_route_counts[route] += 1
if route not in bucket.per_route_duration_samples:
bucket.per_route_duration_samples[route] = deque(maxlen=300)
bucket.per_route_duration_samples[route].append(duration_ms)
# Prune old buckets (keep last 24h)
prune_old_minute_buckets(timestamp)
def prune_old_minute_buckets(current_timestamp: float):
"""Remove buckets older than 24 hours"""
cutoff_ts = int((current_timestamp - 86400) // 60 * 60) # 24h ago
old_keys = [k for k in MINUTE_BUCKETS.keys() if k < cutoff_ts]
for k in old_keys:
del MINUTE_BUCKETS[k]
def get_minute_buckets_in_window(window_minutes: int) -> List[MinuteBucket]:
"""Get minute buckets for the specified time window"""
now = time.time()
cutoff_ts = int((now - window_minutes * 60) // 60 * 60)
return [bucket for ts, bucket in MINUTE_BUCKETS.items() if ts >= cutoff_ts]
def compute_req_per_min(window_minutes: int) -> float:
"""Compute requests per minute for a time window"""
buckets = get_minute_buckets_in_window(window_minutes)
if not buckets:
return 0.0
total_requests = sum(b.requests for b in buckets)
return total_requests / window_minutes if window_minutes > 0 else 0.0
def compute_error_rate(window_minutes: int, error_type: str = "5xx") -> float:
"""Compute error rate (percentage) for a time window"""
buckets = get_minute_buckets_in_window(window_minutes)
if not buckets:
return 0.0
total_requests = sum(b.requests for b in buckets)
if total_requests == 0:
return 0.0
if error_type == "4xx":
total_errors = sum(b.errors_4xx for b in buckets)
else: # 5xx
total_errors = sum(b.errors_5xx for b in buckets)
return (total_errors / total_requests) * 100
def compute_latency_percentile(window_minutes: int, percentile: int = 95) -> float:
"""Compute latency percentile from bounded samples in time window"""
buckets = get_minute_buckets_in_window(window_minutes)
if not buckets:
return 0.0
all_samples = []
for bucket in buckets:
all_samples.extend(bucket.duration_samples_ms)
if not all_samples:
return 0.0
sorted_samples = sorted(all_samples)
idx = int(len(sorted_samples) * percentile / 100)
if idx >= len(sorted_samples):
idx = len(sorted_samples) - 1
return sorted_samples[idx]
def compute_avg_latency(window_minutes: int) -> float:
"""Compute average latency in time window"""
buckets = get_minute_buckets_in_window(window_minutes)
if not buckets:
return 0.0
all_samples = []
for bucket in buckets:
all_samples.extend(bucket.duration_samples_ms)
if not all_samples:
return 0.0
return sum(all_samples) / len(all_samples)
def get_top_routes_by_volume(window_minutes: int, limit: int = 10) -> List[Dict[str, Any]]:
"""Get top routes by request volume"""
buckets = get_minute_buckets_in_window(window_minutes)
if not buckets:
return []
route_counts = Counter()
for bucket in buckets:
route_counts.update(bucket.per_route_counts)
top_routes = []
for route, count in route_counts.most_common(limit):
top_routes.append({
"route": route,
"count": count,
"req_per_min": count / window_minutes if window_minutes > 0 else 0
})
return top_routes
def get_slowest_routes(window_minutes: int, limit: int = 10) -> List[Dict[str, Any]]:
"""Get slowest routes by p95 latency"""
buckets = get_minute_buckets_in_window(window_minutes)
if not buckets:
return []
route_samples = defaultdict(list)
route_counts = Counter()
for bucket in buckets:
route_counts.update(bucket.per_route_counts)
for route, samples in bucket.per_route_duration_samples.items():
route_samples[route].extend(samples)
route_p95 = []
for route, samples in route_samples.items():
if samples:
sorted_samples = sorted(samples)
idx = int(len(sorted_samples) * 95 / 100)
if idx >= len(sorted_samples):
idx = len(sorted_samples) - 1
p95 = sorted_samples[idx]
route_p95.append({
"route": route,
"p95_latency_ms": round(p95, 2),
"count": route_counts[route]
})
# Sort by p95 descending
route_p95.sort(key=lambda x: x["p95_latency_ms"], reverse=True)
return route_p95[:limit]
def get_error_prone_routes(window_minutes: int, limit: int = 10) -> List[Dict[str, Any]]:
"""Get routes with highest 5xx error rate"""
# Get samples from the window
now = time.time()
cutoff = now - (window_minutes * 60)
route_requests = Counter()
route_5xx_errors = Counter()
for sample in REQUEST_SAMPLES:
if sample.timestamp >= cutoff:
route_requests[sample.route] += 1
if sample.status_code >= 500:
route_5xx_errors[sample.route] += 1
route_error_rates = []
for route in route_requests:
total = route_requests[route]
errors = route_5xx_errors[route]
if total > 0:
error_rate = (errors / total) * 100
if errors > 0: # Only include routes with actual errors
route_error_rates.append({
"route": route,
"error_5xx_count": errors,
"total_requests": total,
"error_5xx_rate": round(error_rate, 2)
})
# Sort by error rate descending
route_error_rates.sort(key=lambda x: x["error_5xx_rate"], reverse=True)
return route_error_rates[:limit]
def estimate_unique_sessions(window_minutes: int) -> int:
"""Estimate unique sessions from IPs or session memory"""
# Try to use MEMORY_BY_KEY for session tracking
now = time.time()
cutoff = now - (window_minutes * 60)
active_keys = set()
for key, state in MEMORY_BY_KEY.items():
if state.last_updated_ts >= cutoff:
active_keys.add(key)
if active_keys:
return len(active_keys)
# Fallback: count unique IPs from REQUEST_SAMPLES
unique_ips = set()
for sample in REQUEST_SAMPLES:
if sample.timestamp >= cutoff:
unique_ips.add(sample.client_ip)
return len(unique_ips)
def detect_health_events():
"""Detect health status changes and add events"""
global _LAST_SNAPSHOT
now = time.time()
uptime = now - SERVER_START_TIME
# Only check after 2 minutes of uptime
if uptime < 120:
return
# Get current metrics
current_req_per_min_5m = compute_req_per_min(5)
current_p95_latency_5m = compute_latency_percentile(5, 95)
current_5xx_rate_5m = compute_error_rate(5, "5xx")
current_req_per_min_60m = compute_req_per_min(60)
# Get previous snapshot
prev_req_per_min_5m = _LAST_SNAPSHOT.get("req_per_min_5m", 0)
prev_p95_latency_5m = _LAST_SNAPSHOT.get("p95_latency_5m", 0)
prev_5xx_rate_5m = _LAST_SNAPSHOT.get("error_5xx_rate_5m", 0)
prev_req_per_min_60m = _LAST_SNAPSHOT.get("req_per_min_60m", 0)
# Check latency jump
if prev_p95_latency_5m > 0:
latency_ratio = current_p95_latency_5m / prev_p95_latency_5m
latency_diff = current_p95_latency_5m - prev_p95_latency_5m
if latency_ratio > 2.0 and latency_diff > 200:
severity = "high" if latency_ratio > 3.0 else "warning"
event = MissionControlEvent(
ts=now,
severity=severity,
title=f"Latency spike detected",
details=f"P95 latency increased {latency_ratio:.1f}x ({prev_p95_latency_5m:.0f}ms → {current_p95_latency_5m:.0f}ms)",
metric_before=prev_p95_latency_5m,
metric_after=current_p95_latency_5m
)
EVENTS.append(event)
# Check 5xx rate thresholds
if current_5xx_rate_5m >= 5.0 and prev_5xx_rate_5m < 5.0:
event = MissionControlEvent(
ts=now,
severity="disaster",
title="Critical error rate",
details=f"5xx error rate at {current_5xx_rate_5m:.1f}%",
metric_before=prev_5xx_rate_5m,
metric_after=current_5xx_rate_5m
)
EVENTS.append(event)
elif current_5xx_rate_5m >= 2.0 and prev_5xx_rate_5m < 2.0:
event = MissionControlEvent(
ts=now,
severity="high",
title="High error rate",
details=f"5xx error rate at {current_5xx_rate_5m:.1f}%",
metric_before=prev_5xx_rate_5m,
metric_after=current_5xx_rate_5m
)
EVENTS.append(event)
elif current_5xx_rate_5m >= 1.0 and prev_5xx_rate_5m < 1.0:
event = MissionControlEvent(
ts=now,
severity="warning",
title="Elevated error rate",
details=f"5xx error rate at {current_5xx_rate_5m:.1f}%",
metric_before=prev_5xx_rate_5m,
metric_after=current_5xx_rate_5m
)
EVENTS.append(event)
# Check traffic drop
if uptime > 120 and prev_req_per_min_5m > 1.0 and current_req_per_min_5m < 0.1:
event = MissionControlEvent(
ts=now,
severity="disaster",
title="Traffic dropped to zero",
details=f"Request rate dropped from {prev_req_per_min_5m:.1f} to {current_req_per_min_5m:.1f} req/min",
metric_before=prev_req_per_min_5m,
metric_after=current_req_per_min_5m
)
EVENTS.append(event)
# Check traffic spike
baseline_60m = prev_req_per_min_60m if prev_req_per_min_60m > 0 else current_req_per_min_60m
if baseline_60m > 0 and current_req_per_min_5m > baseline_60m * 2:
severity = "info" if current_req_per_min_5m < baseline_60m * 3 else "warning"
event = MissionControlEvent(
ts=now,
severity=severity,
title="Traffic spike detected",
details=f"Request rate at {current_req_per_min_5m:.1f} req/min (baseline: {baseline_60m:.1f})",
metric_before=baseline_60m,
metric_after=current_req_per_min_5m
)
EVENTS.append(event)
# Update snapshot
_LAST_SNAPSHOT = {
"timestamp": now,
"req_per_min_5m": current_req_per_min_5m,
"p95_latency_5m": current_p95_latency_5m,
"error_5xx_rate_5m": current_5xx_rate_5m,
"req_per_min_60m": current_req_per_min_60m,
}
# ============================================================
# TOP IPs IMAGE CACHE (per-credential cached for BYOC safety)
# ============================================================
# Structure: { (credential_cache_key, bucket, date, limit): (timestamp, png_bytes) }
TOP_IPS_CACHE: Dict[Tuple[str, str, str, int], Tuple[float, bytes]] = {}
TOP_IPS_TTL = 120 # seconds
def client_ip(request: Request) -> str:
return request.client.host if request.client else "unknown"
def memory_key(request: Request, x_session_id: Optional[str], credential_cache_key: str) -> str:
"""
Generate a memory key for session/IP tracking that includes credential isolation.
In BYOC scenarios, we must ensure different credentials don't share memory state,
even if they're from the same IP or session. This prevents cross-user data leakage.
"""
base_key = ""
if x_session_id and x_session_id.strip():
base_key = f"sess:{x_session_id.strip()}"
else:
base_key = f"ip:{client_ip(request)}"
# Include credential cache key to isolate memory by credentials
return f"{base_key}:cred:{credential_cache_key}"
def rate_limit(ip: str):
now = time.time()
b = BUCKET_BY_IP.get(ip)
if not b:
b = TokenBucket(tokens=float(RATE_LIMIT_BURST), last_ts=now)
BUCKET_BY_IP[ip] = b
elapsed = now - b.last_ts
b.tokens = min(float(RATE_LIMIT_BURST), b.tokens + elapsed * RATE_LIMIT_RPS)
b.last_ts = now
if b.tokens < 1.0:
raise HTTPException(status_code=429, detail="Too Many Requests")
b.tokens -= 1.0
def update_memory(key: str, bucket: Optional[str], tool: Optional[str]):
MEMORY_BY_KEY[key] = MemoryState(last_bucket=bucket, last_tool=tool, last_updated_ts=time.time())
def get_memory(key: str) -> Optional[MemoryState]:
st = MEMORY_BY_KEY.get(key)
if not st:
return None
if time.time() - st.last_updated_ts > 1800:
MEMORY_BY_KEY.pop(key, None)
return None
return st
# ============================================================
# AUTH
# ============================================================
def require_api_key(x_api_key: Optional[str]):
if APP_API_KEY and x_api_key != APP_API_KEY:
raise HTTPException(status_code=401, detail="Unauthorized (missing/invalid X-API-Key)")
# ============================================================
# BUCKET DISCOVERY (per-credential cached for BYOC safety)
# ============================================================
# Structure: { credential_cache_key: (timestamp, bucket_set, last_error) }
_BUCKET_CACHE_BY_CREDENTIAL: Dict[str, Tuple[float, Set[str], Optional[str]]] = {}
def _seed_known_buckets(log_bucket: Optional[str] = None, metrics_bucket: Optional[str] = None) -> Set[str]:
"""
If list_buckets is not permitted (common with scoped keys),
allow explicitly configured buckets to still pass require_bucket_allowed().
"""
buckets = set()
if log_bucket:
buckets.add(log_bucket)
if metrics_bucket:
buckets.add(metrics_bucket)
if SCHEDULER_SOURCE_BUCKETS:
buckets.update(SCHEDULER_SOURCE_BUCKETS)
if FALLBACK_BUCKETS:
buckets.update(FALLBACK_BUCKETS)
return buckets
def refresh_bucket_cache(
s3_client,
credential_cache_key: str,
force: bool = False,
log_bucket: Optional[str] = None,
metrics_bucket: Optional[str] = None
) -> Set[str]:
"""
Refresh bucket cache for a specific credential set (BYOC-safe).
Each credential gets its own cache entry to prevent cross-user bucket leakage.
The credential_cache_key should be generated using get_credential_cache_key().
"""
now = time.time()
# Check if we have a valid cached entry for this credential
cached_entry = _BUCKET_CACHE_BY_CREDENTIAL.get(credential_cache_key)
if not force and cached_entry:
cache_ts, cached_buckets, _ = cached_entry
if cached_buckets and (now - cache_ts) < BUCKET_CACHE_TTL_SEC: