-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathalert_router.py
More file actions
90 lines (74 loc) · 3.91 KB
/
alert_router.py
File metadata and controls
90 lines (74 loc) · 3.91 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
import json
from typing import List, Dict, Any, Optional
from fastapi import APIRouter, Depends, HTTPException, status, Query
from sqlalchemy.orm import Session
from server import models, schemas, auth
from server.database import get_db
router = APIRouter()
# API Endpoints for Alert Rule Management
@router.post("/alert-rules", status_code=status.HTTP_201_CREATED, response_model=schemas.AlertRule)
def create_alert_rule(rule: schemas.AlertRuleCreate, db: Session = Depends(get_db), current_user: schemas.User = Depends(auth.get_current_active_user)):
db_rule = models.AlertRule(**rule.model_dump())
db.add(db_rule)
db.commit()
db.refresh(db_rule)
return db_rule
@router.get("/alert-rules", response_model=List[schemas.AlertRule])
def get_alert_rules(db: Session = Depends(get_db), current_user: schemas.User = Depends(auth.get_current_active_user)):
rules = db.query(models.AlertRule).order_by(models.AlertRule.created_at.desc()).all()
return rules
@router.get("/alert-rules/{rule_id}", response_model=schemas.AlertRule)
def get_alert_rule(rule_id: int, db: Session = Depends(get_db), current_user: schemas.User = Depends(auth.get_current_active_user)):
rule = db.query(models.AlertRule).filter(models.AlertRule.id == rule_id).first()
if not rule:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Alert rule not found")
return rule
@router.put("/alert-rules/{rule_id}", response_model=schemas.AlertRule)
def update_alert_rule(rule_id: int, rule_update: schemas.AlertRuleUpdate, db: Session = Depends(get_db), current_user: schemas.User = Depends(auth.get_current_active_user)):
db_rule = db.query(models.AlertRule).filter(models.AlertRule.id == rule_id).first()
if not db_rule:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Alert rule not found")
for key, value in rule_update.model_dump(exclude_unset=True).items():
setattr(db_rule, key, value)
db.commit()
db.refresh(db_rule)
return db_rule
@router.delete("/alert-rules/{rule_id}", status_code=status.HTTP_204_NO_CONTENT)
def delete_alert_rule(rule_id: int, db: Session = Depends(get_db), current_user: schemas.User = Depends(auth.get_current_active_user)):
db_rule = db.query(models.AlertRule).filter(models.AlertRule.id == rule_id).first()
if not db_rule:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Alert rule not found")
db.delete(db_rule)
db.commit()
return
# Endpoint to fetch recent logs globally
@router.get("/logs/recent", response_model=List[schemas.Log])
def get_recent_logs(
limit: int = Query(10, ge=1, le=100),
is_alert: Optional[bool] = None,
alert_status: Optional[str] = None,
severity: Optional[str] = None,
db: Session = Depends(get_db),
current_user: schemas.User = Depends(auth.get_current_active_user)
):
"""Fetches recent log entries globally, with optional filtering."""
query = db.query(models.Log)
if is_alert is not None:
query = query.filter(models.Log.is_alert == is_alert)
if alert_status:
query = query.filter(models.Log.alert_status == alert_status)
if severity:
query = query.filter(models.Log.severity == severity)
logs = query.order_by(models.Log.timestamp.desc()).limit(limit).all()
return logs
# Endpoint to update a log entry's alert status
@router.put("/logs/{log_id}", response_model=schemas.Log)
def update_log_entry(log_id: int, log_update: schemas.LogUpdate, db: Session = Depends(get_db), current_user: schemas.User = Depends(auth.get_current_active_user)):
db_log = db.query(models.Log).filter(models.Log.id == log_id).first()
if not db_log:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Log entry not found")
for key, value in log_update.model_dump(exclude_unset=True).items():
setattr(db_log, key, value)
db.commit()
db.refresh(db_log)
return db_log