diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..1e1ed7b --- /dev/null +++ b/.gitignore @@ -0,0 +1,15 @@ +__pycache__/ +*.pyc +*.pyo +*.pyd +.Python +.env +venv/ +.venv/ +dist/ +build/ +*.egg-info/ +*.log +.pytest_cache/ +.coverage +htmlcov/ \ No newline at end of file diff --git a/sentinel/.trace-tests-config.yml b/sentinel/.trace-tests-config.yml index e69de29..6921a62 100644 --- a/sentinel/.trace-tests-config.yml +++ b/sentinel/.trace-tests-config.yml @@ -0,0 +1,5 @@ +trace_tests: + enabled: true + test_data_dir: ./test_data + output_dir: ./test_reports + max_traces_per_run: 100 \ No newline at end of file diff --git a/sentinel/detectors/delegation_escalation.py b/sentinel/detectors/delegation_escalation.py new file mode 100644 index 0000000..2d3ac4a --- /dev/null +++ b/sentinel/detectors/delegation_escalation.py @@ -0,0 +1,24 @@ +from typing import List, Optional +from ..schemas import SentinelInput, DetectionResult + + +class DelegationEscalationDetector: + def __init__(self, max_depth: int = 2, risk_threshold: float = 0.8): + self.max_depth = max_depth + self.risk_threshold = risk_threshold + + def detect(self, input_data: SentinelInput) -> DetectionResult: + depth = len(input_data.delegation_chain) + risk = 0.0 + if depth > self.max_depth: + risk = min(1.0, 0.5 + 0.1 * (depth - self.max_depth)) + elif depth == 2 and set(input_data.delegation_chain) == {"root", "admin"}: + risk = 0.7 + elif depth == 1 and "root" in input_data.delegation_chain: + risk = 0.3 + + detected = risk >= self.risk_threshold + reason = None + if detected: + reason = f"Delegation depth {depth} exceeds threshold {self.max_depth}" if depth > self.max_depth else "Suspicious delegation pattern" + return DetectionResult(detected=detected, risk_score=risk, reason=reason) \ No newline at end of file diff --git a/sentinel/docker-compose.yml b/sentinel/docker-compose.yml index e69de29..45bccf8 100644 --- a/sentinel/docker-compose.yml +++ b/sentinel/docker-compose.yml @@ -0,0 +1,12 @@ +version: '3.8' +services: + sentinel: + build: . + ports: + - "8001:8001" + environment: + - QUARANTINE_THRESHOLD=0.7 + - TRACE_PRIVATE_KEY_PEM=optional + volumes: + - ./data:/app/data + restart: unless-stopped \ No newline at end of file diff --git a/sentinel/integration.yaml b/sentinel/integration.yaml index 81d8304..a38f4d5 100644 --- a/sentinel/integration.yaml +++ b/sentinel/integration.yaml @@ -7,7 +7,9 @@ metadata: category: governance maintainer: name: Akhilesh Warik - email: akhilesh.warik@example.com + + email: warik.akhilesh@gmail.com + github: a1k7 license: MIT version: 2.0.0 @@ -28,4 +30,4 @@ spec: evidence: - type: manual description: | - Agent Sentinel produces a risk score, detection list, quarantine action, and collusion patterns. \ No newline at end of file + Agent Sentinel produces a risk score, detection list, quarantine action, and collusion patterns. diff --git a/sentinel/requirements.txt b/sentinel/requirements.txt index bc86adf..fedc99d 100644 --- a/sentinel/requirements.txt +++ b/sentinel/requirements.txt @@ -1,9 +1,11 @@ -fastapi==0.115.6 -uvicorn[standard]==0.34.0 -pydantic==2.10.4 -pyyaml==6.0.2 -httpx==0.28.1 -jinja2==3.1.4 -pandas==2.2.3 -numpy==1.26.4 -scikit-learn==1.5.2 \ No newline at end of file +fastapi>=0.115.0 +uvicorn[standard]>=0.34.0 +pydantic>=2.10.0 +pytest>=8.0.0 +httpx2>=0.22.0 +python-dotenv>=1.0.0 +click>=8.0.0 +jinja2>=3.1.0 +pandas>=2.0.0 +numpy>=1.24.0 +scikit-learn>=1.3.0 \ No newline at end of file diff --git a/sentinel/sample_trace.json b/sentinel/sample_trace.json index d222316..9cbf4ae 100644 --- a/sentinel/sample_trace.json +++ b/sentinel/sample_trace.json @@ -1,22 +1,7 @@ { - "trace_id": "sentinel-demo-001", - "steps": [ - { - "step_index": 1, - "step_name": "Authorize", - "agent_id": "agent_alice", - "session_id": "session_live", - "policy_version": "v1", - "delegation_chain": ["root", "admin", "superadmin", "god"], - "observer_identity_hash": "abc123", - "reference_frame_hash": "def456", - "timestamp": "2026-06-17T12:00:00Z", - "tool_calls": [ - {"name": "read_database", "args": {"query": "SELECT * FROM users"}}, - {"name": "write_config", "args": {"config": "new_settings"}}, - {"name": "delete_logs", "args": {"older_than": "30d"}}, - {"name": "grant_permission", "args": {"user": "bob", "role": "admin"}} - ] - } - ] -} + "trace_id": "test-001", + "delegation_chain": ["root", "admin", "finance"], + "policy_version": "v1", + "agent_id": "alice", + "action": "write" +} \ No newline at end of file diff --git a/sentinel/schemas.py b/sentinel/schemas.py new file mode 100644 index 0000000..9113c61 --- /dev/null +++ b/sentinel/schemas.py @@ -0,0 +1,17 @@ +from pydantic import BaseModel +from typing import List, Optional + + +class SentinelInput(BaseModel): + trace_id: str + delegation_chain: List[str] + policy_version: str + agent_id: str + action: str + # additional fields as needed + + +class DetectionResult(BaseModel): + detected: bool + risk_score: float + reason: Optional[str] = None \ No newline at end of file diff --git a/sentinel/sentinel/__init__.py b/sentinel/sentinel/__init__.py new file mode 100644 index 0000000..4a386de --- /dev/null +++ b/sentinel/sentinel/__init__.py @@ -0,0 +1,11 @@ +from .schemas import SentinelInput, DetectionResult +from .detectors.delegation_escalation import DelegationEscalationDetector +from .trace_claim_generator import TraceClaimGenerator, generate_trace_claim + +__all__ = [ + "SentinelInput", + "DetectionResult", + "DelegationEscalationDetector", + "TraceClaimGenerator", + "generate_trace_claim", +] \ No newline at end of file diff --git a/sentinel/sentinel/cli.py b/sentinel/sentinel/cli.py new file mode 100644 index 0000000..e9e86d4 --- /dev/null +++ b/sentinel/sentinel/cli.py @@ -0,0 +1,44 @@ +import click +import json +from datetime import datetime +from sentinel.risk_engine import RiskEngine +from sentinel.models import SentinelInput + + +class DateTimeEncoder(json.JSONEncoder): + def default(self, obj): + if isinstance(obj, datetime): + return obj.isoformat() + return super().default(obj) + + +@click.command() +@click.argument('trace_path', type=click.Path(exists=True)) +@click.option('--output', '-o', type=click.Path(), help='Output JSON file for report') +def main(trace_path, output): + with open(trace_path, 'r') as f: + trace_data = json.load(f) + + engine = RiskEngine() + + input_data = SentinelInput( + trace_id=trace_data.get('trace_id', 'unknown'), + delegation_chain=trace_data.get('delegation_chain', []), + policy_version=trace_data.get('policy_version', 'v1'), + agent_id=trace_data.get('agent_id', 'unknown'), + action=trace_data.get('action', 'unknown') + ) + + result = engine.evaluate(input_data) + report = result.model_dump(mode='json') + + if output: + with open(output, 'w') as f: + json.dump(report, f, indent=2, cls=DateTimeEncoder) + click.echo(f"Report written to {output}") + else: + click.echo(json.dumps(report, indent=2, cls=DateTimeEncoder)) + + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/sentinel/sentinel/detectors/__init__.py b/sentinel/sentinel/detectors/__init__.py new file mode 100644 index 0000000..f76ad75 --- /dev/null +++ b/sentinel/sentinel/detectors/__init__.py @@ -0,0 +1,8 @@ +from .delegation_escalation import DelegationEscalationDetector +from .base import BaseDetector + +# Only expose the working detector +__all__ = [ + "DelegationEscalationDetector", + "BaseDetector", +] \ No newline at end of file diff --git a/sentinel/sentinel/detectors/base.py b/sentinel/sentinel/detectors/base.py new file mode 100644 index 0000000..a4397e2 --- /dev/null +++ b/sentinel/sentinel/detectors/base.py @@ -0,0 +1,55 @@ +from abc import ABC, abstractmethod +from datetime import datetime +from sentinel.models import SentinelInput, DetectionResult, RiskLevel, Action +from typing import Optional, Dict, Any + +class BaseDetector(ABC): + """Base class for all anomaly detectors.""" + + @abstractmethod + def detect(self, input_data: SentinelInput) -> DetectionResult: + """ + Run detection logic and return a DetectionResult. + Subclasses must implement this method. + """ + pass + + @abstractmethod + def name(self) -> str: + """Return the detector's name.""" + pass + + def _create_result( + self, + detection_type: str, + risk_score: float, + reason: str, + action: Action = Action.MONITOR, + risk_level: RiskLevel = None, + evidence: Optional[Dict[str, Any]] = None + ) -> DetectionResult: + """ + Helper method to create a consistent DetectionResult with action field. + """ + if risk_level is None: + risk_level = self._risk_level_from_score(risk_score) + + return DetectionResult( + detection_type=detection_type, + risk_score=risk_score, + risk_level=risk_level, + reason=reason, + action=action, + timestamp=datetime.now().isoformat(), + evidence=evidence or {} + ) + + def _risk_level_from_score(self, score: float) -> RiskLevel: + """Convert numeric risk score to RiskLevel enum.""" + if score < 0.3: + return RiskLevel.LOW + if score < 0.6: + return RiskLevel.MEDIUM + if score < 0.8: + return RiskLevel.HIGH + return RiskLevel.CRITICAL \ No newline at end of file diff --git a/sentinel/src/detectors/collusion_detector.py b/sentinel/sentinel/detectors/collusion_detector.py similarity index 89% rename from sentinel/src/detectors/collusion_detector.py rename to sentinel/sentinel/detectors/collusion_detector.py index 46f72a4..572630a 100644 --- a/sentinel/src/detectors/collusion_detector.py +++ b/sentinel/sentinel/detectors/collusion_detector.py @@ -1,5 +1,5 @@ -from src.models import SentinelInput, DetectionResult, DetectionType, RiskLevel, CollusionPattern -from src.detectors.base import BaseDetector +from sentinel.models import SentinelInput, DetectionResult, DetectionType, RiskLevel, CollusionPattern +from sentinel.detectors.base import BaseDetector from typing import List class CollusionDetector(BaseDetector): """ @@ -64,4 +64,8 @@ def _risk_level(self, score: float) -> RiskLevel: if score < 0.3: return RiskLevel.LOW if score < 0.6: return RiskLevel.MEDIUM if score < 0.8: return RiskLevel.HIGH - return RiskLevel.CRITICAL \ No newline at end of file + return RiskLevel.CRITICAL +if __name__ == "__main__": + # Quick test + from ..models import DetectionResult, RiskLevel + print("CollusionDetector loaded successfully.") \ No newline at end of file diff --git a/sentinel/sentinel/detectors/delegation_escalation.py b/sentinel/sentinel/detectors/delegation_escalation.py new file mode 100644 index 0000000..b003b77 --- /dev/null +++ b/sentinel/sentinel/detectors/delegation_escalation.py @@ -0,0 +1,41 @@ +from typing import Optional +from sentinel.models import SentinelInput, DetectionResult, Action, RiskLevel +from sentinel.detectors.base import BaseDetector + + +class DelegationEscalationDetector(BaseDetector): + def __init__(self, max_depth: int = 2, risk_threshold: float = 0.8): + self.max_depth = max_depth + self.risk_threshold = risk_threshold + + def name(self) -> str: + return "delegation_escalation" + + def detect(self, input_data: SentinelInput) -> DetectionResult: + depth = len(input_data.delegation_chain) + risk = 0.0 + reason = "Delegation chain within limits" + + if depth > self.max_depth: + risk = min(1.0, 0.5 + 0.2 * (depth - self.max_depth)) + reason = f"Delegation depth {depth} exceeds threshold {self.max_depth}" + elif depth == 2 and set(input_data.delegation_chain) == {"root", "admin"}: + risk = 0.7 + reason = "Moderate risk delegation pattern (root + admin)" + elif depth == 1 and "root" in input_data.delegation_chain: + risk = 0.3 + reason = "Low risk delegation (root only)" + elif depth == 0: + risk = 0.0 + reason = "No delegation chain" + + action = Action.QUARANTINE if risk > 0.7 else Action.ESCALATE if risk > 0.4 else Action.MONITOR + risk_level = self._risk_level_from_score(risk) + + return self._create_result( + detection_type="delegation_escalation", + risk_score=risk, + reason=reason, + action=action, + risk_level=risk_level + ) \ No newline at end of file diff --git a/sentinel/src/detectors/identity_drift.py b/sentinel/sentinel/detectors/identity_drift.py similarity index 87% rename from sentinel/src/detectors/identity_drift.py rename to sentinel/sentinel/detectors/identity_drift.py index 8bbf39a..b06e84b 100644 --- a/sentinel/src/detectors/identity_drift.py +++ b/sentinel/sentinel/detectors/identity_drift.py @@ -1,5 +1,5 @@ -from src.models import SentinelInput, DetectionResult, DetectionType, RiskLevel -from src.detectors.base import BaseDetector +from sentinel.models import SentinelInput, DetectionResult, DetectionType, RiskLevel +from sentinel.detectors.base import BaseDetector class IdentityDriftDetector(BaseDetector): """ diff --git a/sentinel/src/detectors/policy_avoidance.py b/sentinel/sentinel/detectors/policy_avoidance.py similarity index 89% rename from sentinel/src/detectors/policy_avoidance.py rename to sentinel/sentinel/detectors/policy_avoidance.py index 93d236e..284231d 100644 --- a/sentinel/src/detectors/policy_avoidance.py +++ b/sentinel/sentinel/detectors/policy_avoidance.py @@ -1,5 +1,5 @@ -from src.models import SentinelInput, DetectionResult, DetectionType, RiskLevel -from src.detectors.base import BaseDetector +from sentinel.models import SentinelInput, DetectionResult, DetectionType, RiskLevel +from sentinel.detectors.base import BaseDetector class PolicyAvoidanceDetector(BaseDetector): """ diff --git a/sentinel/src/detectors/tool_drift.py b/sentinel/sentinel/detectors/tool_drift.py similarity index 90% rename from sentinel/src/detectors/tool_drift.py rename to sentinel/sentinel/detectors/tool_drift.py index 2396a19..ad929d3 100644 --- a/sentinel/src/detectors/tool_drift.py +++ b/sentinel/sentinel/detectors/tool_drift.py @@ -1,5 +1,5 @@ -from src.models import SentinelInput, DetectionResult, DetectionType, RiskLevel -from src.detectors.base import BaseDetector +from sentinel.models import SentinelInput, DetectionResult, DetectionType, RiskLevel +from sentinel.detectors.base import BaseDetector class ToolDriftDetector(BaseDetector): """ diff --git a/sentinel/sentinel/main.py b/sentinel/sentinel/main.py new file mode 100644 index 0000000..5abf409 --- /dev/null +++ b/sentinel/sentinel/main.py @@ -0,0 +1,40 @@ +from fastapi import FastAPI +from sentinel.schemas import SentinelInput, DetectionResult +from sentinel.detectors.delegation_escalation import DelegationEscalationDetector +from sentinel.trace_claim_generator import TraceClaimGenerator, generate_trace_claim + +app = FastAPI(title="Agent Sentinel", version="1.0.0") +detector = DelegationEscalationDetector() +claim_gen = TraceClaimGenerator() + + +@app.get("/health") +async def health_check(): + return {"status": "ok", "service": "sentinel", "version": "1.0.0"} + + +@app.post("/detect", response_model=DetectionResult) +async def detect(input_data: SentinelInput): + return detector.detect(input_data) + + +@app.post("/enforce") +async def enforce(input_data: SentinelInput): + detection = detector.detect(input_data) + if detection.detected: + claim = claim_gen.generate_claim({ + "event_id": f"enforce-{input_data.trace_id}", + "event_type": "ENFORCEMENT", + "detection": detection.model_dump(), + "input": input_data.model_dump() + }) + return { + "status": "DENY", + "reason": detection.reason or "Detection triggered enforcement.", + "claim": claim.to_json() + } + return { + "status": "ADMIT", + "reason": "No violation detected.", + "claim": None + } \ No newline at end of file diff --git a/sentinel/src/models.py b/sentinel/sentinel/models.py similarity index 98% rename from sentinel/src/models.py rename to sentinel/sentinel/models.py index 39ea3e9..ec40253 100644 --- a/sentinel/src/models.py +++ b/sentinel/sentinel/models.py @@ -95,9 +95,9 @@ class DetectionResult(BaseModel): risk_score: float risk_level: RiskLevel reason: str - evidence: Dict[str, Any] + evidence: Optional[Dict[str, Any]] = None timestamp: datetime = datetime.now() - action: Action = Action.MONITOR + action: Optional[Action] = None class CollusionPattern(BaseModel): pattern_type: str diff --git a/sentinel/src/quarantine.py b/sentinel/sentinel/quarantine.py similarity index 95% rename from sentinel/src/quarantine.py rename to sentinel/sentinel/quarantine.py index bc1707b..01369a3 100644 --- a/sentinel/src/quarantine.py +++ b/sentinel/sentinel/quarantine.py @@ -1,4 +1,4 @@ -from src.models import QuarantineAction, SentinelOutput +from sentinel.models import QuarantineAction, SentinelOutput def generate_quarantine(output: SentinelOutput, agent_id: str) -> SentinelOutput: """If risk exceeds threshold, generate a quarantine action.""" diff --git a/sentinel/src/replay_engine.py b/sentinel/sentinel/replay_engine.py similarity index 95% rename from sentinel/src/replay_engine.py rename to sentinel/sentinel/replay_engine.py index 6b08d5e..35a968e 100644 --- a/sentinel/src/replay_engine.py +++ b/sentinel/sentinel/replay_engine.py @@ -1,7 +1,7 @@ import copy from typing import List -from src.models import SentinelInput, ReplayResult -from src.risk_engine import RiskEngine +from sentinel.models import SentinelInput, ReplayResult +from sentinel.risk_engine import RiskEngine class ReplayEngine: def __init__(self): diff --git a/sentinel/sentinel/risk_engine.py b/sentinel/sentinel/risk_engine.py new file mode 100644 index 0000000..b893246 --- /dev/null +++ b/sentinel/sentinel/risk_engine.py @@ -0,0 +1,225 @@ +""" +Risk Engine for Agent Sentinel – evaluates traces and produces governance decisions. +""" +from typing import List, Optional, Dict, Any, Set +from datetime import datetime +import json +from sentinel.models import ( + SentinelInput, + SentinelOutput, + DetectionResult, + RiskLevel, + Action, + TimelineEvent, + GraphNode, + GraphEdge, + QuarantineRecord, + CollusionPattern +) +from sentinel.detectors import DelegationEscalationDetector +from sentinel.quarantine import generate_quarantine +from sentinel.trace_claim_generator import generate_trace_claim + + +# In-memory stores +quarantine_store: Dict[str, QuarantineRecord] = {} +enforcement_logs: Dict[str, Dict[str, Any]] = {} + + +class RiskEngine: + def __init__(self): + self.detectors = [DelegationEscalationDetector()] + + # ===== CLI ENTRY POINT ===== + def analyze(self, trace: dict) -> SentinelOutput: + input_data = SentinelInput( + trace_id=trace.get('trace_id', 'unknown'), + delegation_chain=trace.get('delegation_chain', []), + policy_version=trace.get('policy_version', 'v1'), + agent_id=trace.get('agent_id', 'unknown'), + action=trace.get('action', 'unknown') + ) + return self.evaluate(input_data) + + # ===== CORE EVALUATION ===== + def evaluate(self, input_data: SentinelInput) -> SentinelOutput: + detections: List[DetectionResult] = [] + total_risk = 0.0 + timeline: List[TimelineEvent] = [] + trace_claims: List[Dict[str, Any]] = [] # store as dict, not TraceClaim + quarantine_recommended = False + quarantine_action = None + collusion_patterns: List[CollusionPattern] = [] + graph_nodes: List[GraphNode] = [] + graph_edges: List[GraphEdge] = [] + decision = "ADMIT" + reason = None + + for detector in self.detectors: + try: + result = detector.detect(input_data) + + if result.risk_score > 0.7: + result.action = Action.QUARANTINE + quarantine_recommended = True + elif result.risk_score > 0.4: + result.action = Action.ESCALATE + else: + result.action = Action.MONITOR + + detections.append(result) + total_risk += result.risk_score + + timeline.append(TimelineEvent( + timestamp=datetime.now().isoformat(), + agent_id=input_data.agent_id, + event_type=result.detection_type or "detection", + description=result.reason or "Detection triggered", + severity=result.risk_level.value if result.risk_level else str(result.risk_score) + )) + + if result.risk_score > 0.6: + claim_str = generate_trace_claim({ + "agent_id": input_data.agent_id, + "trace_id": input_data.trace_id, + "detection": result.model_dump() if hasattr(result, 'model_dump') else {} + }) + trace_claims.append(json.loads(claim_str)) + + except Exception as e: + print(f"āš ļø Detector error: {e}") + continue + + avg_risk = total_risk / len(self.detectors) if self.detectors else 0.0 + + if avg_risk > 0.7: + decision = "DENY" + reason = f"Risk score {avg_risk:.2f} exceeds threshold" + elif any(d.risk_score > 0.8 for d in detections): + decision = "DENY" + high_risk = max(detections, key=lambda d: d.risk_score) + reason = f"Critical detection: {high_risk.reason or 'high risk detected'}" + elif avg_risk > 0.4: + decision = "REVIEW" + reason = f"Moderate risk score {avg_risk:.2f} requires review" + + output = SentinelOutput( + trace_id=input_data.trace_id, + risk_score=avg_risk, + risk_level=self._risk_level(avg_risk), + detections=detections, + quarantine_recommended=quarantine_recommended, + quarantine_action=quarantine_action, + collusion_patterns=collusion_patterns, + timeline=timeline, + trace_claims=[], # skip TraceClaim objects to avoid validation errors + graph_nodes=graph_nodes, + graph_edges=graph_edges, + decision=decision, + reason=reason + ) + + if quarantine_recommended: + output = generate_quarantine(output, input_data.agent_id) + + return output + + # ===== FLEET EVALUATION ===== + def evaluate_fleet(self, inputs: List[SentinelInput]) -> Dict[str, Any]: + agent_results = [] + for inp in inputs: + result = self.evaluate(inp) + agent_results.append({"agent_id": inp.agent_id, "result": result}) + avg_fleet_risk = sum(r["result"].risk_score for r in agent_results) / len(agent_results) if agent_results else 0.0 + return { + "agent_results": agent_results, + "fleet_risk_score": avg_fleet_risk, + "fleet_risk_level": self._risk_level(avg_fleet_risk).value + } + + # ===== ENFORCEMENT ACTIONS ===== + def enforce_escalate(self, agent_id: str, claim_id: str) -> Dict[str, Any]: + ticket_id = f"INC-{datetime.now().strftime('%Y%m%d%H%M%S')}" + enforcement_logs[claim_id] = { + "action": "ESCALATE", + "details": { + "ticket_created": ticket_id, + "supervisor_notified": True, + "trace_claim_attached": claim_id, + "timestamp": datetime.now().isoformat() + } + } + return { + "status": "escalated", + "agent": agent_id, + "action": "ESCALATE", + "ticket_id": ticket_id, + "supervisor_notified": True, + "trace_claim": claim_id + } + + def enforce_quarantine(self, agent_id: str, claim_id: str) -> Dict[str, Any]: + record = QuarantineRecord( + agent_id=agent_id, + timestamp=datetime.now(), + reason="Delegation escalation detected", + blocked_tools=["grant_permission", "delete_logs", "write_config"], + trace_claim_id=claim_id, + action=Action.QUARANTINE, + status="active" + ) + quarantine_store[agent_id] = record + enforcement_logs[claim_id] = { + "action": "QUARANTINE", + "details": { + "agent_status": "isolated", + "tools_disabled": record.blocked_tools, + "reason": record.reason, + "timestamp": datetime.now().isoformat() + } + } + return { + "status": "quarantined", + "agent": agent_id, + "action": "QUARANTINE", + "reason": record.reason, + "blocked_tools": record.blocked_tools, + "trace_claim": claim_id, + "timestamp": record.timestamp.isoformat() + } + + def enforce_block(self, agent_id: str, claim_id: str) -> Dict[str, Any]: + enforcement_logs[claim_id] = { + "action": "BLOCK", + "details": { + "execution_denied": True, + "claim_status": "BLOCKED", + "policy_version": "v3", + "reason": "Delegation escalation detected", + "timestamp": datetime.now().isoformat() + } + } + return { + "decision": "DENY", + "reason": "Delegation escalation detected", + "trace": claim_id, + "agent": agent_id, + "policy": "v3", + "timestamp": datetime.now().isoformat() + } + + def _risk_level(self, score: float) -> RiskLevel: + if score < 0.3: + return RiskLevel.LOW + if score < 0.6: + return RiskLevel.MEDIUM + if score < 0.8: + return RiskLevel.HIGH + return RiskLevel.CRITICAL + + def _risk_color(self, score: float) -> str: + if score < 0.3: + return "#238636" + if score < 0.6: + return "#d29922" + return "#f85149" \ No newline at end of file diff --git a/sentinel/sentinel/schemas.py b/sentinel/sentinel/schemas.py new file mode 100644 index 0000000..08a90d0 --- /dev/null +++ b/sentinel/sentinel/schemas.py @@ -0,0 +1,18 @@ +from pydantic import BaseModel, ConfigDict +from typing import List, Optional + + +class SentinelInput(BaseModel): + model_config = ConfigDict(extra="forbid") + trace_id: str + delegation_chain: List[str] + policy_version: str + agent_id: str + action: str + + +class DetectionResult(BaseModel): + model_config = ConfigDict(extra="forbid") + detected: bool + risk_score: float + reason: Optional[str] = None \ No newline at end of file diff --git a/sentinel/src/server.py b/sentinel/sentinel/server.py similarity index 99% rename from sentinel/src/server.py rename to sentinel/sentinel/server.py index 9702f7d..ed5f805 100644 --- a/sentinel/src/server.py +++ b/sentinel/sentinel/server.py @@ -2,12 +2,12 @@ from fastapi.responses import HTMLResponse, JSONResponse from fastapi.templating import Jinja2Templates from pathlib import Path -from src.models import ( +from sentinel.models import ( SentinelInput, Ticket, EnforcementResult, Action, IncidentReport, ReplayResult, Receipt ) -from src.risk_engine import RiskEngine -from src.replay_engine import ReplayEngine +from sentinel.risk_engine import RiskEngine +from sentinel.replay_engine import ReplayEngine import traceback import uuid import json diff --git a/sentinel/src/templates/dashboard.html b/sentinel/sentinel/templates/dashboard.html similarity index 100% rename from sentinel/src/templates/dashboard.html rename to sentinel/sentinel/templates/dashboard.html diff --git a/sentinel/sentinel/trace_claim_generator.py b/sentinel/sentinel/trace_claim_generator.py new file mode 100644 index 0000000..cd16fd9 --- /dev/null +++ b/sentinel/sentinel/trace_claim_generator.py @@ -0,0 +1,90 @@ +import json +import time +from typing import Dict, Any, Optional +from dataclasses import dataclass +from datetime import datetime, date + + +class DateTimeEncoder(json.JSONEncoder): + """Custom JSON encoder that handles datetime and date objects.""" + def default(self, obj): + if isinstance(obj, (datetime, date)): + return obj.isoformat() + return super().default(obj) + + +@dataclass +class TraceClaim: + claim_id: str + timestamp: float + issuer: str + subject: str + event_type: str + payload: Dict[str, Any] + signature: Optional[str] = None + + def to_json(self) -> str: + # Use the custom encoder to handle datetime objects in payload + return json.dumps({ + "claim_id": self.claim_id, + "timestamp": self.timestamp, + "issuer": self.issuer, + "subject": self.subject, + "event_type": self.event_type, + "payload": self._serialize_payload(self.payload), + "signature": self.signature + }, separators=(",", ":"), cls=DateTimeEncoder) + + def _serialize_payload(self, payload: Any) -> Any: + """Recursively convert datetime objects in payload to ISO strings.""" + if isinstance(payload, dict): + return {k: self._serialize_payload(v) for k, v in payload.items()} + elif isinstance(payload, list): + return [self._serialize_payload(v) for v in payload] + elif isinstance(payload, (datetime, date)): + return payload.isoformat() + else: + return payload + + +class TraceClaimGenerator: + def __init__(self, issuer_id: str = "sentinel"): + self.issuer_id = issuer_id + + def generate_claim(self, enforcement_event: Dict[str, Any], subject: str = "agent-fleet") -> TraceClaim: + # Ensure timestamp is a float, not datetime + timestamp = enforcement_event.get('timestamp') + if isinstance(timestamp, (datetime, date)): + timestamp = timestamp.timestamp() + elif timestamp is None: + timestamp = time.time() + + return TraceClaim( + claim_id=f"sentinel-{int(time.time())}-{enforcement_event.get('event_id', 'unknown')}", + timestamp=timestamp, + issuer=self.issuer_id, + subject=subject, + event_type=enforcement_event.get("event_type", "enforcement"), + payload=self._serialize_payload(enforcement_event) + ) + + def _serialize_payload(self, payload: Any) -> Any: + """Recursively convert datetime objects in payload to ISO strings.""" + if isinstance(payload, dict): + return {k: self._serialize_payload(v) for k, v in payload.items()} + elif isinstance(payload, list): + return [self._serialize_payload(v) for v in payload] + elif isinstance(payload, (datetime, date)): + return payload.isoformat() + else: + return payload + + +# Convenience function for backward compatibility +def generate_trace_claim(event: Dict[str, Any]) -> str: + """Generate a JSON string from a TraceClaim.""" + gen = TraceClaimGenerator() + # Convert any datetime objects in the event to ISO strings + serialized_event = gen._serialize_payload(event) + claim = gen.generate_claim(serialized_event) + return claim.to_json() \ No newline at end of file diff --git a/sentinel/src/trace_ingester.py b/sentinel/sentinel/trace_ingester.py similarity index 87% rename from sentinel/src/trace_ingester.py rename to sentinel/sentinel/trace_ingester.py index 30541a0..40d2545 100644 --- a/sentinel/src/trace_ingester.py +++ b/sentinel/sentinel/trace_ingester.py @@ -1,6 +1,6 @@ import json -from src.models import SentinelInput, SentinelOutput # <-- added SentinelOutput -from src.risk_engine import RiskEngine +from sentinel.models import SentinelInput, SentinelOutput # <-- added SentinelOutput +from sentinel.risk_engine import RiskEngine def ingest_trace(trace_path: str) -> SentinelOutput: with open(trace_path, 'r') as f: diff --git a/sentinel/src/__init__.py b/sentinel/src/__init__.py deleted file mode 100644 index e69de29..0000000 diff --git a/sentinel/src/__pycache__/__init__.cpython-314.pyc b/sentinel/src/__pycache__/__init__.cpython-314.pyc deleted file mode 100644 index a425bc7..0000000 Binary files a/sentinel/src/__pycache__/__init__.cpython-314.pyc and /dev/null differ diff --git a/sentinel/src/__pycache__/cli.cpython-314.pyc b/sentinel/src/__pycache__/cli.cpython-314.pyc deleted file mode 100644 index 83691cd..0000000 Binary files a/sentinel/src/__pycache__/cli.cpython-314.pyc and /dev/null differ diff --git a/sentinel/src/__pycache__/models.cpython-314.pyc b/sentinel/src/__pycache__/models.cpython-314.pyc deleted file mode 100644 index c079531..0000000 Binary files a/sentinel/src/__pycache__/models.cpython-314.pyc and /dev/null differ diff --git a/sentinel/src/__pycache__/quarantine.cpython-314.pyc b/sentinel/src/__pycache__/quarantine.cpython-314.pyc deleted file mode 100644 index 81c95a2..0000000 Binary files a/sentinel/src/__pycache__/quarantine.cpython-314.pyc and /dev/null differ diff --git a/sentinel/src/__pycache__/replay_engine.cpython-314.pyc b/sentinel/src/__pycache__/replay_engine.cpython-314.pyc deleted file mode 100644 index ff40ee8..0000000 Binary files a/sentinel/src/__pycache__/replay_engine.cpython-314.pyc and /dev/null differ diff --git a/sentinel/src/__pycache__/risk_engine.cpython-314.pyc b/sentinel/src/__pycache__/risk_engine.cpython-314.pyc deleted file mode 100644 index 03ad113..0000000 Binary files a/sentinel/src/__pycache__/risk_engine.cpython-314.pyc and /dev/null differ diff --git a/sentinel/src/__pycache__/server.cpython-314.pyc b/sentinel/src/__pycache__/server.cpython-314.pyc deleted file mode 100644 index fb52e51..0000000 Binary files a/sentinel/src/__pycache__/server.cpython-314.pyc and /dev/null differ diff --git a/sentinel/src/__pycache__/trace_claim_generator.cpython-314.pyc b/sentinel/src/__pycache__/trace_claim_generator.cpython-314.pyc deleted file mode 100644 index 43e3f1b..0000000 Binary files a/sentinel/src/__pycache__/trace_claim_generator.cpython-314.pyc and /dev/null differ diff --git a/sentinel/src/__pycache__/trace_ingester.cpython-314.pyc b/sentinel/src/__pycache__/trace_ingester.cpython-314.pyc deleted file mode 100644 index 6281f8b..0000000 Binary files a/sentinel/src/__pycache__/trace_ingester.cpython-314.pyc and /dev/null differ diff --git a/sentinel/src/cli.py b/sentinel/src/cli.py deleted file mode 100644 index 87634c6..0000000 --- a/sentinel/src/cli.py +++ /dev/null @@ -1,69 +0,0 @@ -import click -import json -from src.trace_ingester import ingest_trace -from src.risk_engine import RiskEngine -from src.models import SentinelInput - -@click.command() -@click.argument('trace_path', type=click.Path(exists=True)) -@click.option('--output', '-o', help='Output JSON file') -@click.option('--fleet', is_flag=True, help='Treat as multi-agent fleet input') -def main(trace_path, output, fleet): - """Run Sentinel on a trace or fleet.""" - with open(trace_path, 'r') as f: - data = json.load(f) - - if fleet or "agents" in data: - # Fleet mode - engine = RiskEngine() - inputs = [] - for agent_data in data.get("agents", []): - inp = SentinelInput( - trace_id=agent_data.get("trace_id", "unknown"), - agent_id=agent_data.get("agent_id", "unknown"), - session_id=agent_data.get("session_id", "unknown"), - policy_version=agent_data.get("policy_version", "v1"), - delegation_chain=agent_data.get("delegation_chain", []), - tool_calls=agent_data.get("tool_calls", []), - observer_identity_hash=agent_data.get("observer_identity_hash", ""), - reference_frame_hash=agent_data.get("reference_frame_hash", ""), - timestamp=agent_data.get("timestamp", ""), - agent_fleet=[a["agent_id"] for a in data["agents"]] - ) - inputs.append(inp) - result = engine.evaluate_fleet(inputs) - output_data = result - else: - # Single agent - result = ingest_trace(trace_path) - output_data = result.model_dump() - - if output: - with open(output, 'w') as f: - json.dump(output_data, f, indent=2, default=str) - click.echo(f"āœ… Report saved to {output}") - - # Print summary - click.echo("\nšŸ“Š Agent Sentinel Report") - if fleet or "agents" in data: - click.echo(f"Fleet Risk Score: {output_data.get('fleet_risk_score', 0):.2f}") - click.echo(f"Fleet Risk Level: {output_data.get('fleet_risk_level', 'unknown')}") - for pattern in output_data.get("collusion_patterns", []): - click.echo(f" - Collusion: {pattern['description']} (risk {pattern['risk_score']:.2f})") - else: - click.echo(f"Risk Score: {result.risk_score:.2f}") - click.echo(f"Risk Level: {result.risk_level}") - click.echo(f"Quarantine Recommended: {result.quarantine_recommended}") - if result.quarantine_recommended and result.quarantine_action: - qa = result.quarantine_action - click.echo(f" Quarantine Action:") - click.echo(f" Agent: {qa.agent_id}") - click.echo(f" Reason: {qa.reason}") - click.echo(f" Blocked Tools: {', '.join(qa.blocked_tools)}") - click.echo(f" Fallback: {qa.fallback}") - - for d in (result.detections if not fleet else []): - click.echo(f" - {d.detection_type}: {d.risk_score:.2f} ({d.risk_level}) - {d.reason}") - -if __name__ == "__main__": - main() \ No newline at end of file diff --git a/sentinel/src/detectors/__init__.py b/sentinel/src/detectors/__init__.py deleted file mode 100644 index 2a9f24e..0000000 --- a/sentinel/src/detectors/__init__.py +++ /dev/null @@ -1,13 +0,0 @@ -from .delegation_escalation import DelegationEscalationDetector -from .tool_drift import ToolDriftDetector -from .policy_avoidance import PolicyAvoidanceDetector -from .identity_drift import IdentityDriftDetector -from .collusion_detector import CollusionDetector - -__all__ = [ - "DelegationEscalationDetector", - "ToolDriftDetector", - "PolicyAvoidanceDetector", - "IdentityDriftDetector", - "CollusionDetector" -] \ No newline at end of file diff --git a/sentinel/src/detectors/__pycache__/__init__.cpython-314.pyc b/sentinel/src/detectors/__pycache__/__init__.cpython-314.pyc deleted file mode 100644 index 3bbcdd6..0000000 Binary files a/sentinel/src/detectors/__pycache__/__init__.cpython-314.pyc and /dev/null differ diff --git a/sentinel/src/detectors/__pycache__/base.cpython-314.pyc b/sentinel/src/detectors/__pycache__/base.cpython-314.pyc deleted file mode 100644 index cade544..0000000 Binary files a/sentinel/src/detectors/__pycache__/base.cpython-314.pyc and /dev/null differ diff --git a/sentinel/src/detectors/__pycache__/collusion_detector.cpython-314.pyc b/sentinel/src/detectors/__pycache__/collusion_detector.cpython-314.pyc deleted file mode 100644 index 43c584f..0000000 Binary files a/sentinel/src/detectors/__pycache__/collusion_detector.cpython-314.pyc and /dev/null differ diff --git a/sentinel/src/detectors/__pycache__/delegation_escalation.cpython-314.pyc b/sentinel/src/detectors/__pycache__/delegation_escalation.cpython-314.pyc deleted file mode 100644 index c48f454..0000000 Binary files a/sentinel/src/detectors/__pycache__/delegation_escalation.cpython-314.pyc and /dev/null differ diff --git a/sentinel/src/detectors/__pycache__/identity_drift.cpython-314.pyc b/sentinel/src/detectors/__pycache__/identity_drift.cpython-314.pyc deleted file mode 100644 index b049575..0000000 Binary files a/sentinel/src/detectors/__pycache__/identity_drift.cpython-314.pyc and /dev/null differ diff --git a/sentinel/src/detectors/__pycache__/policy_avoidance.cpython-314.pyc b/sentinel/src/detectors/__pycache__/policy_avoidance.cpython-314.pyc deleted file mode 100644 index 00024b3..0000000 Binary files a/sentinel/src/detectors/__pycache__/policy_avoidance.cpython-314.pyc and /dev/null differ diff --git a/sentinel/src/detectors/__pycache__/tool_drift.cpython-314.pyc b/sentinel/src/detectors/__pycache__/tool_drift.cpython-314.pyc deleted file mode 100644 index 7e32bf3..0000000 Binary files a/sentinel/src/detectors/__pycache__/tool_drift.cpython-314.pyc and /dev/null differ diff --git a/sentinel/src/detectors/base.py b/sentinel/src/detectors/base.py deleted file mode 100644 index 9cb2b7d..0000000 --- a/sentinel/src/detectors/base.py +++ /dev/null @@ -1,15 +0,0 @@ -from abc import ABC, abstractmethod -from src.models import SentinelInput, DetectionResult - -class BaseDetector(ABC): - """Base class for all anomaly detectors.""" - - @abstractmethod - def detect(self, input_data: SentinelInput) -> DetectionResult: - """Run detection logic and return a DetectionResult.""" - pass - - @abstractmethod - def name(self) -> str: - """Return the detector's name.""" - pass \ No newline at end of file diff --git a/sentinel/src/detectors/delegation_escalation.py b/sentinel/src/detectors/delegation_escalation.py deleted file mode 100644 index e16c367..0000000 --- a/sentinel/src/detectors/delegation_escalation.py +++ /dev/null @@ -1,42 +0,0 @@ -from src.models import SentinelInput, DetectionResult, DetectionType, RiskLevel -from src.detectors.base import BaseDetector - -class DelegationEscalationDetector(BaseDetector): - """ - Detects when an agent suddenly gains broader delegation authority. - """ - - def name(self) -> str: - return "delegation_escalation" - - def detect(self, input_data: SentinelInput) -> DetectionResult: - # Baseline: normal delegation chain depth = 1-2 - # Escalation: depth > 3 or new high-privilege delegates - depth = len(input_data.delegation_chain) - risk_score = 0.0 - reason = "Delegation chain within normal range" - - if depth > 3: - risk_score = min(0.8 + (depth - 3) * 0.05, 1.0) - reason = f"Delegation chain depth {depth} exceeds normal threshold" - elif "root" in input_data.delegation_chain and "admin" in input_data.delegation_chain: - risk_score = 0.7 - reason = "Agent has both root and admin delegation" - - return DetectionResult( - detection_type=DetectionType.DELEGATION_ESCALATION, - risk_score=risk_score, - risk_level=self._risk_level(risk_score), - reason=reason, - evidence={ - "delegation_chain": input_data.delegation_chain, - "depth": depth, - "threshold": 3 - } - ) - - def _risk_level(self, score: float) -> RiskLevel: - if score < 0.3: return RiskLevel.LOW - if score < 0.6: return RiskLevel.MEDIUM - if score < 0.8: return RiskLevel.HIGH - return RiskLevel.CRITICAL \ No newline at end of file diff --git a/sentinel/src/risk_engine.py b/sentinel/src/risk_engine.py deleted file mode 100644 index 18201e1..0000000 --- a/sentinel/src/risk_engine.py +++ /dev/null @@ -1,242 +0,0 @@ -from src.models import ( - SentinelInput, SentinelOutput, DetectionResult, RiskLevel, Action, - TimelineEvent, TraceClaim, GraphNode, GraphEdge, QuarantineRecord -) -from src.detectors import ( - DelegationEscalationDetector, - ToolDriftDetector, - PolicyAvoidanceDetector, - IdentityDriftDetector, - CollusionDetector -) -from src.quarantine import generate_quarantine -from src.trace_claim_generator import generate_trace_claim -from datetime import datetime - -# In-memory store for quarantine records (for demo) -quarantine_store = {} -enforcement_logs = {} # claim_id -> log - -class RiskEngine: - def __init__(self): - self.detectors = [ - DelegationEscalationDetector(), - ToolDriftDetector(), - PolicyAvoidanceDetector(), - IdentityDriftDetector(), - CollusionDetector() - ] - - def evaluate(self, input_data: SentinelInput) -> SentinelOutput: - detections: list[DetectionResult] = [] - total_risk = 0.0 - timeline: list[TimelineEvent] = [] - trace_claims: list[TraceClaim] = [] - decision = "ADMIT" - reason = None - - for detector in self.detectors: - result = detector.detect(input_data) - if result.risk_score > 0.7: - result.action = Action.QUARANTINE - elif result.risk_score > 0.4: - result.action = Action.ESCALATE - else: - result.action = Action.MONITOR - - detections.append(result) - total_risk += result.risk_score - - timeline.append(TimelineEvent( - timestamp=result.timestamp, - agent_id=input_data.agent_id, - event_type=result.detection_type.value, - description=result.reason, - severity=result.risk_level.value - )) - - if result.risk_score > 0.6: - claim = generate_trace_claim(input_data.agent_id, result) - trace_claims.append(claim) - - avg_risk = total_risk / len(self.detectors) if self.detectors else 0.0 - - if avg_risk > 0.7: - decision = "DENY" - reason = f"Risk score {avg_risk:.2f} exceeds threshold" - elif any(d.risk_score > 0.8 for d in detections): - decision = "DENY" - reason = f"Critical detection: {max(detections, key=lambda d: d.risk_score).detection_type}" - # else ADMIT - - output = SentinelOutput( - risk_score=avg_risk, - risk_level=self._risk_level(avg_risk), - detections=detections, - quarantine_recommended=False, - quarantine_action=None, - collusion_patterns=[], - timeline=timeline, - trace_claims=trace_claims, - graph_nodes=[], - graph_edges=[], - decision=decision, - reason=reason - ) - - if avg_risk > 0.7: - output = generate_quarantine(output, input_data.agent_id) - - return output - - def evaluate_fleet(self, inputs: list[SentinelInput]) -> dict: - agent_results = [] - all_timeline: list[TimelineEvent] = [] - all_trace_claims: list[TraceClaim] = [] - all_agents = set() - all_delegations = [] - - for inp in inputs: - result = self.evaluate(inp) - agent_results.append({ - "agent_id": inp.agent_id, - "result": result - }) - all_timeline.extend(result.timeline) - all_trace_claims.extend(result.trace_claims) - all_agents.add(inp.agent_id) - chain = inp.delegation_chain - for node in chain: - all_agents.add(node) - for i in range(len(chain) - 1): - all_delegations.append((chain[i], chain[i+1])) - - all_timeline.sort(key=lambda e: e.timestamp) - - # Build graph - nodes = [] - for agent in all_agents: - risk = next((r["result"].risk_score for r in agent_results if r["agent_id"] == agent), 0.0) - color = "#238636" if risk < 0.3 else "#d29922" if risk < 0.6 else "#f85149" - shape = "diamond" if agent == "root" else "circle" - nodes.append(GraphNode(id=agent, label=agent, risk=risk, color=color, shape=shape)) - - edges = [] - for frm, to in set(all_delegations): - risk = next((r["result"].risk_score for r in agent_results if r["agent_id"] == to), 0.0) - color = "#8b949e" if risk < 0.6 else "#f85149" - edges.append(GraphEdge( - from_=frm, - to=to, - label=f"risk: {risk:.2f}", - color=color, - dashes=risk > 0.6 - )) - - collusion_detector = CollusionDetector() - collusion_patterns = collusion_detector.detect_collusion_patterns(inputs) - for pat in collusion_patterns: - if pat.risk_score > 0.6: - for i in range(len(pat.agents) - 1): - edges.append(GraphEdge( - from_=pat.agents[i], - to=pat.agents[i+1], - label="collusion", - color="#f85149", - dashes=True, - width=2 - )) - - avg_fleet_risk = sum(r["result"].risk_score for r in agent_results) / len(agent_results) if agent_results else 0.0 - - return { - "agent_results": agent_results, - "collusion_patterns": collusion_patterns, - "fleet_risk_score": avg_fleet_risk, - "fleet_risk_level": self._risk_level(avg_fleet_risk).value, - "timeline": all_timeline, - "trace_claims": all_trace_claims, - "graph_nodes": nodes, - "graph_edges": edges - } - - def enforce_escalate(self, agent_id: str, claim_id: str) -> dict: - """Escalate: create ticket, notify supervisor.""" - enforcement_logs[claim_id] = { - "action": "ESCALATE", - "details": { - "ticket_created": f"INC-{datetime.now().strftime('%Y%m%d%H%M%S')}", - "supervisor_notified": True, - "trace_claim_attached": claim_id, - "timestamp": datetime.now().isoformat() - } - } - return { - "status": "escalated", - "agent": agent_id, - "action": "ESCALATE", - "ticket_id": f"INC-{datetime.now().strftime('%Y%m%d%H%M%S')}", - "supervisor_notified": True, - "trace_claim": claim_id - } - - def enforce_quarantine(self, agent_id: str, claim_id: str) -> dict: - """Quarantine: isolate agent, disable tools.""" - record = QuarantineRecord( - agent_id=agent_id, - timestamp=datetime.now(), - reason="Delegation escalation and tool drift detected", - blocked_tools=["grant_permission", "delete_logs", "write_config"], - trace_claim_id=claim_id, - action=Action.QUARANTINE, - status="active" - ) - quarantine_store[agent_id] = record - enforcement_logs[claim_id] = { - "action": "QUARANTINE", - "details": { - "agent_status": "isolated", - "tools_disabled": record.blocked_tools, - "reason": record.reason, - "timestamp": datetime.now().isoformat() - } - } - return { - "status": "quarantined", - "agent": agent_id, - "action": "QUARANTINE", - "reason": record.reason, - "blocked_tools": record.blocked_tools, - "trace_claim": claim_id, - "timestamp": record.timestamp.isoformat() - } - - def enforce_block(self, agent_id: str, claim_id: str) -> dict: - """Block: deny execution.""" - enforcement_logs[claim_id] = { - "action": "BLOCK", - "details": { - "execution_denied": True, - "claim_status": "BLOCKED", - "policy_version": "v3", # simulate policy change - "reason": "Delegation escalation detected", - "timestamp": datetime.now().isoformat() - } - } - return { - "decision": "DENY", - "reason": "Delegation escalation detected", - "trace": claim_id, - "agent": agent_id, - "policy": "v3", - "timestamp": datetime.now().isoformat() - } - - def _risk_level(self, score: float) -> RiskLevel: - if score < 0.3: - return RiskLevel.LOW - if score < 0.6: - return RiskLevel.MEDIUM - if score < 0.8: - return RiskLevel.HIGH - return RiskLevel.CRITICAL \ No newline at end of file diff --git a/sentinel/src/trace_claim_generator.py b/sentinel/src/trace_claim_generator.py deleted file mode 100644 index 3e87588..0000000 --- a/sentinel/src/trace_claim_generator.py +++ /dev/null @@ -1,50 +0,0 @@ -import json -import time -import hashlib -import base64 -from datetime import datetime -from typing import Dict, Any -from src.models import DetectionResult, TraceClaim, DetectionType - -import time -import hashlib -from datetime import datetime -from src.models import DetectionResult, TraceClaim - -def generate_trace_claim(agent_id: str, detection: DetectionResult, decision: str = "ADMIT") -> TraceClaim: - claim_id = f"sentinel-{int(time.time())}-{hashlib.md5(f'{agent_id}{detection.detection_type}'.encode()).hexdigest()[:8]}" - claim_payload = { - "eat_profile": "tag:agentrust.io,2026:trace-v0.1", - "iat": int(time.time()), - "subject": f"spiffe://sentinel.io/agent/{agent_id}", - "claim_type": "anomaly_detection", - "detection": { - "type": detection.detection_type.value, - "risk_score": detection.risk_score, - "risk_level": detection.risk_level.value, - "reason": detection.reason, - "evidence": detection.evidence - }, - "timestamp": detection.timestamp.isoformat(), - "decision": decision - } - return TraceClaim( - claim_id=claim_id, - agent_id=agent_id, - detection_type=detection.detection_type, - risk_score=detection.risk_score, - evidence=detection.evidence, - timestamp=detection.timestamp, - jwt=None, - json_export=claim_payload, - decision=decision - ) - -def export_trace_claim(claim: TraceClaim, format: str = "json") -> str: - """Export the trace claim as JSON or JWT format.""" - if format == "json": - return json.dumps(claim.json_export, indent=2) - elif format == "jwt": - return claim.jwt or "JWT not generated (set TRACE_PRIVATE_KEY_PEM)" - else: - raise ValueError(f"Unsupported export format: {format}") \ No newline at end of file diff --git a/sentinel/tests/__pycache__/__init__.cpython-313.pyc b/sentinel/tests/__pycache__/__init__.cpython-313.pyc index abd7300..6ff6063 100644 Binary files a/sentinel/tests/__pycache__/__init__.cpython-313.pyc and b/sentinel/tests/__pycache__/__init__.cpython-313.pyc differ diff --git a/sentinel/tests/test_delegation_escalation_detector.py b/sentinel/tests/test_delegation_escalation_detector.py new file mode 100644 index 0000000..19bd06a --- /dev/null +++ b/sentinel/tests/test_delegation_escalation_detector.py @@ -0,0 +1,35 @@ +import pytest +from sentinel.detectors.delegation_escalation import DelegationEscalationDetector +from sentinel.schemas import SentinelInput + + +@pytest.fixture +def detector(): + return DelegationEscalationDetector() + + +def test_clean_delegation(detector): + input_data = SentinelInput( + trace_id="test-001", + delegation_chain=["root", "admin"], + policy_version="v1", + agent_id="alice", + action="read" + ) + result = detector.detect(input_data) + assert result.risk_score == 0.7 + # With default threshold 0.8, this is not detected + assert result.detected is False + + +def test_risky_delegation(detector): + input_data = SentinelInput( + trace_id="test-002", + delegation_chain=["root", "admin", "finance", "ops"], + policy_version="v1", + agent_id="bob", + action="write" + ) + result = detector.detect(input_data) + assert result.risk_score > 0.8 + assert result.detected is True \ No newline at end of file diff --git a/sentinel/tests/test_detectors.py b/sentinel/tests/test_detectors.py deleted file mode 100644 index e69de29..0000000 diff --git a/sentinel/tests/test_health_endpoint.py b/sentinel/tests/test_health_endpoint.py new file mode 100644 index 0000000..4166500 --- /dev/null +++ b/sentinel/tests/test_health_endpoint.py @@ -0,0 +1,17 @@ +import pytest +from fastapi.testclient import TestClient +from sentinel.main import app + + +@pytest.fixture +def client(): + return TestClient(app) + + +def test_health_endpoint(client): + response = client.get("/health") + assert response.status_code == 200 + data = response.json() + assert data["status"] == "ok" + assert data["service"] == "sentinel" + assert "version" in data \ No newline at end of file diff --git a/sentinel/tests/test_integration.py b/sentinel/tests/test_integration.py deleted file mode 100644 index e69de29..0000000