Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
__pycache__/
*.pyc
*.pyo
*.pyd
.Python
.env
venv/
.venv/
dist/
build/
*.egg-info/
*.log
.pytest_cache/
.coverage
htmlcov/
5 changes: 5 additions & 0 deletions sentinel/.trace-tests-config.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
trace_tests:
enabled: true
test_data_dir: ./test_data
output_dir: ./test_reports
max_traces_per_run: 100
24 changes: 24 additions & 0 deletions sentinel/detectors/delegation_escalation.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
from typing import List, Optional
from ..schemas import SentinelInput, DetectionResult


class DelegationEscalationDetector:
def __init__(self, max_depth: int = 2, risk_threshold: float = 0.8):
self.max_depth = max_depth
self.risk_threshold = risk_threshold

def detect(self, input_data: SentinelInput) -> DetectionResult:
depth = len(input_data.delegation_chain)
risk = 0.0
if depth > self.max_depth:
risk = min(1.0, 0.5 + 0.1 * (depth - self.max_depth))
elif depth == 2 and set(input_data.delegation_chain) == {"root", "admin"}:
risk = 0.7
elif depth == 1 and "root" in input_data.delegation_chain:
risk = 0.3

detected = risk >= self.risk_threshold
reason = None
if detected:
reason = f"Delegation depth {depth} exceeds threshold {self.max_depth}" if depth > self.max_depth else "Suspicious delegation pattern"
return DetectionResult(detected=detected, risk_score=risk, reason=reason)
12 changes: 12 additions & 0 deletions sentinel/docker-compose.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
version: '3.8'
services:
sentinel:
build: .
ports:
- "8001:8001"
environment:
- QUARANTINE_THRESHOLD=0.7
- TRACE_PRIVATE_KEY_PEM=optional
volumes:
- ./data:/app/data
restart: unless-stopped
6 changes: 4 additions & 2 deletions sentinel/integration.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@ metadata:
category: governance
maintainer:
name: Akhilesh Warik
email: akhilesh.warik@example.com

email: warik.akhilesh@gmail.com

github: a1k7
license: MIT
version: 2.0.0
Expand All @@ -28,4 +30,4 @@ spec:
evidence:
- type: manual
description: |
Agent Sentinel produces a risk score, detection list, quarantine action, and collusion patterns.
Agent Sentinel produces a risk score, detection list, quarantine action, and collusion patterns.
20 changes: 11 additions & 9 deletions sentinel/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
fastapi==0.115.6
uvicorn[standard]==0.34.0
pydantic==2.10.4
pyyaml==6.0.2
httpx==0.28.1
jinja2==3.1.4
pandas==2.2.3
numpy==1.26.4
scikit-learn==1.5.2
fastapi>=0.115.0
uvicorn[standard]>=0.34.0
pydantic>=2.10.0
pytest>=8.0.0
httpx2>=0.22.0
python-dotenv>=1.0.0
click>=8.0.0
jinja2>=3.1.0
pandas>=2.0.0
numpy>=1.24.0
scikit-learn>=1.3.0
27 changes: 6 additions & 21 deletions sentinel/sample_trace.json
Original file line number Diff line number Diff line change
@@ -1,22 +1,7 @@
{
"trace_id": "sentinel-demo-001",
"steps": [
{
"step_index": 1,
"step_name": "Authorize",
"agent_id": "agent_alice",
"session_id": "session_live",
"policy_version": "v1",
"delegation_chain": ["root", "admin", "superadmin", "god"],
"observer_identity_hash": "abc123",
"reference_frame_hash": "def456",
"timestamp": "2026-06-17T12:00:00Z",
"tool_calls": [
{"name": "read_database", "args": {"query": "SELECT * FROM users"}},
{"name": "write_config", "args": {"config": "new_settings"}},
{"name": "delete_logs", "args": {"older_than": "30d"}},
{"name": "grant_permission", "args": {"user": "bob", "role": "admin"}}
]
}
]
}
"trace_id": "test-001",
"delegation_chain": ["root", "admin", "finance"],
"policy_version": "v1",
"agent_id": "alice",
"action": "write"
}
17 changes: 17 additions & 0 deletions sentinel/schemas.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
from pydantic import BaseModel
from typing import List, Optional


class SentinelInput(BaseModel):
trace_id: str
delegation_chain: List[str]
policy_version: str
agent_id: str
action: str
# additional fields as needed


class DetectionResult(BaseModel):
detected: bool
risk_score: float
reason: Optional[str] = None
11 changes: 11 additions & 0 deletions sentinel/sentinel/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
from .schemas import SentinelInput, DetectionResult
from .detectors.delegation_escalation import DelegationEscalationDetector
from .trace_claim_generator import TraceClaimGenerator, generate_trace_claim

__all__ = [
"SentinelInput",
"DetectionResult",
"DelegationEscalationDetector",
"TraceClaimGenerator",
"generate_trace_claim",
]
44 changes: 44 additions & 0 deletions sentinel/sentinel/cli.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
import click
import json
from datetime import datetime
from sentinel.risk_engine import RiskEngine
from sentinel.models import SentinelInput


class DateTimeEncoder(json.JSONEncoder):
def default(self, obj):
if isinstance(obj, datetime):
return obj.isoformat()
return super().default(obj)


@click.command()
@click.argument('trace_path', type=click.Path(exists=True))
@click.option('--output', '-o', type=click.Path(), help='Output JSON file for report')
def main(trace_path, output):
with open(trace_path, 'r') as f:
trace_data = json.load(f)

engine = RiskEngine()

input_data = SentinelInput(
trace_id=trace_data.get('trace_id', 'unknown'),
delegation_chain=trace_data.get('delegation_chain', []),
policy_version=trace_data.get('policy_version', 'v1'),
agent_id=trace_data.get('agent_id', 'unknown'),
action=trace_data.get('action', 'unknown')
)

result = engine.evaluate(input_data)
report = result.model_dump(mode='json')

if output:
with open(output, 'w') as f:
json.dump(report, f, indent=2, cls=DateTimeEncoder)
click.echo(f"Report written to {output}")
else:
click.echo(json.dumps(report, indent=2, cls=DateTimeEncoder))


if __name__ == "__main__":
main()
8 changes: 8 additions & 0 deletions sentinel/sentinel/detectors/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
from .delegation_escalation import DelegationEscalationDetector
from .base import BaseDetector

# Only expose the working detector
__all__ = [
"DelegationEscalationDetector",
"BaseDetector",
]
55 changes: 55 additions & 0 deletions sentinel/sentinel/detectors/base.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
from abc import ABC, abstractmethod
from datetime import datetime
from sentinel.models import SentinelInput, DetectionResult, RiskLevel, Action
from typing import Optional, Dict, Any

class BaseDetector(ABC):
"""Base class for all anomaly detectors."""

@abstractmethod
def detect(self, input_data: SentinelInput) -> DetectionResult:
"""
Run detection logic and return a DetectionResult.
Subclasses must implement this method.
"""
pass

@abstractmethod
def name(self) -> str:
"""Return the detector's name."""
pass

def _create_result(
self,
detection_type: str,
risk_score: float,
reason: str,
action: Action = Action.MONITOR,
risk_level: RiskLevel = None,
evidence: Optional[Dict[str, Any]] = None
) -> DetectionResult:
"""
Helper method to create a consistent DetectionResult with action field.
"""
if risk_level is None:
risk_level = self._risk_level_from_score(risk_score)

return DetectionResult(
detection_type=detection_type,
risk_score=risk_score,
risk_level=risk_level,
reason=reason,
action=action,
timestamp=datetime.now().isoformat(),
evidence=evidence or {}
)

def _risk_level_from_score(self, score: float) -> RiskLevel:
"""Convert numeric risk score to RiskLevel enum."""
if score < 0.3:
return RiskLevel.LOW
if score < 0.6:
return RiskLevel.MEDIUM
if score < 0.8:
return RiskLevel.HIGH
return RiskLevel.CRITICAL
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from src.models import SentinelInput, DetectionResult, DetectionType, RiskLevel, CollusionPattern
from src.detectors.base import BaseDetector
from sentinel.models import SentinelInput, DetectionResult, DetectionType, RiskLevel, CollusionPattern
from sentinel.detectors.base import BaseDetector
from typing import List
class CollusionDetector(BaseDetector):
"""
Expand Down Expand Up @@ -64,4 +64,8 @@ def _risk_level(self, score: float) -> RiskLevel:
if score < 0.3: return RiskLevel.LOW
if score < 0.6: return RiskLevel.MEDIUM
if score < 0.8: return RiskLevel.HIGH
return RiskLevel.CRITICAL
return RiskLevel.CRITICAL
if __name__ == "__main__":
# Quick test
from ..models import DetectionResult, RiskLevel
print("CollusionDetector loaded successfully.")
41 changes: 41 additions & 0 deletions sentinel/sentinel/detectors/delegation_escalation.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
from typing import Optional
from sentinel.models import SentinelInput, DetectionResult, Action, RiskLevel
from sentinel.detectors.base import BaseDetector


class DelegationEscalationDetector(BaseDetector):
def __init__(self, max_depth: int = 2, risk_threshold: float = 0.8):
self.max_depth = max_depth
self.risk_threshold = risk_threshold

def name(self) -> str:
return "delegation_escalation"

def detect(self, input_data: SentinelInput) -> DetectionResult:
depth = len(input_data.delegation_chain)
risk = 0.0
reason = "Delegation chain within limits"

if depth > self.max_depth:
risk = min(1.0, 0.5 + 0.2 * (depth - self.max_depth))
reason = f"Delegation depth {depth} exceeds threshold {self.max_depth}"
elif depth == 2 and set(input_data.delegation_chain) == {"root", "admin"}:
risk = 0.7
reason = "Moderate risk delegation pattern (root + admin)"
elif depth == 1 and "root" in input_data.delegation_chain:
risk = 0.3
reason = "Low risk delegation (root only)"
elif depth == 0:
risk = 0.0
reason = "No delegation chain"

action = Action.QUARANTINE if risk > 0.7 else Action.ESCALATE if risk > 0.4 else Action.MONITOR
risk_level = self._risk_level_from_score(risk)

return self._create_result(
detection_type="delegation_escalation",
risk_score=risk,
reason=reason,
action=action,
risk_level=risk_level
)
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from src.models import SentinelInput, DetectionResult, DetectionType, RiskLevel
from src.detectors.base import BaseDetector
from sentinel.models import SentinelInput, DetectionResult, DetectionType, RiskLevel
from sentinel.detectors.base import BaseDetector

class IdentityDriftDetector(BaseDetector):
"""
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from src.models import SentinelInput, DetectionResult, DetectionType, RiskLevel
from src.detectors.base import BaseDetector
from sentinel.models import SentinelInput, DetectionResult, DetectionType, RiskLevel
from sentinel.detectors.base import BaseDetector

class PolicyAvoidanceDetector(BaseDetector):
"""
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from src.models import SentinelInput, DetectionResult, DetectionType, RiskLevel
from src.detectors.base import BaseDetector
from sentinel.models import SentinelInput, DetectionResult, DetectionType, RiskLevel
from sentinel.detectors.base import BaseDetector

class ToolDriftDetector(BaseDetector):
"""
Expand Down
Loading
Loading