Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
183 changes: 183 additions & 0 deletions docs/tutorials/kill-switch.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,183 @@
# AGT SRE Kill Switch

Automatically block a rogue agent identity when its deny rate exceeds a threshold, and produce hardware-attested evidence of the enforcement action.

## What you'll learn

- How to configure the rolling-window kill switch in `cmcp-config.yaml`
- What happens when an agent trips the threshold
- How to read `kill_switch_triggered` in a TRACE claim
- How to unblock an agent identity (operator action)

## Prerequisites

```bash
pip install cmcp-runtime
```

An [Agent Manifest](../../docs/spec/component-model.md) must be bound to the gateway so the runtime has a per-agent SPIFFE URI to track. Anonymous sessions (no manifest) are never blocked.

---

## Background

In a production deployment an agent can go rogue: a bug, a prompt injection, or a misconfiguration causes it to request tool calls that policy forbids. Without automated remediation, the agent keeps running — accumulating denies in the audit chain but never stopping.

The kill switch closes this gap. cMCP tracks policy decisions per agent identity in a rolling time window. When the deny rate crosses a configurable threshold with enough samples, the runtime:

1. Marks the closing TRACE claim with `gateway.kill_switch_triggered: true` — hardware-attested evidence of automated enforcement, verifiable offline by any regulator
2. Blocks all subsequent `create_session()` calls from that agent identity with a `KILL_SWITCH_TRIPPED (403)` response
3. Appends a `break_glass_used` audit entry to the chain recording the trigger event

No other MCP gateway can produce this artifact. The operator or the sovereign oversight body receives a TEE-signed record proving the agent was automatically stopped.

---

## Configure the kill switch

Add a `kill_switch` block to `cmcp-config.yaml`:

```yaml
kill_switch:
enabled: true
window_seconds: 300 # rolling window — 5 minutes
deny_rate_threshold: 0.9 # trip at 90% deny rate
min_calls: 10 # require at least 10 calls before evaluating
```

All fields have defaults — setting `enabled: false` (the default) disables evaluation without removing the block.

| Field | Default | Description |
|---|---|---|
| `enabled` | `false` | Master switch. Set to `true` to activate. |
| `window_seconds` | `300` | Rolling window length in seconds. |
| `deny_rate_threshold` | `0.9` | Fraction of calls that must be denied to trip (0–1]. |
| `min_calls` | `10` | Minimum call count in the window before evaluation starts. |

With `deny_rate_threshold: 0.9` and `min_calls: 10`, an agent must have at least 10 calls in the last 5 minutes with at least 90% of them denied before the kill switch fires.

---

## Run a session that trips the kill switch

Start the gateway with the kill switch enabled and an Agent Manifest bound:

```yaml
attestation:
provider: sev-snp
enforcement_mode: enforcing
agent_manifest:
path: agent.manifest.json
trust_anchor_path: trust-anchor.pem
authenticated_subject: spiffe://example.com/agent/procurement-bot
kill_switch:
enabled: true
window_seconds: 300
deny_rate_threshold: 0.9
min_calls: 10
```

```bash
export CMCP_BEARER_TOKEN="$(openssl rand -hex 32)"
cmcp start --config cmcp-config.yaml
```

Run a session where the agent makes mostly denied calls. When the session closes, cMCP evaluates the rolling window and — if the threshold is exceeded — marks the claim:

```json
{
"gateway": {
"session_id": "9e1b4c3a-...",
"kill_switch_triggered": true,
"call_summary": {
"tool_calls_total": 12,
"tool_calls_allowed": 1,
"tool_calls_denied": 11
}
}
}
```

The next session attempt from `spiffe://example.com/agent/procurement-bot` returns:

```
HTTP 403 KILL_SWITCH_TRIPPED
{
"error": "KILL_SWITCH_TRIPPED",
"detail": "spiffe://example.com/agent/procurement-bot"
}
```

---

## Verify the kill switch TRACE claim

```python
from cmcp_verify import verify_trace_claim, ApprovedHashes

approved = ApprovedHashes(
policy_bundle_hash="sha256:<bundle-hash>",
tool_catalog_hash="sha256:<catalog-hash>",
)
result = verify_trace_claim(claim, approved)

if result.status == "verified":
if claim["gateway"]["kill_switch_triggered"]:
print("Agent was automatically blocked — hardware-attested enforcement confirmed.")
```

A verifier running offline — with no connection to the cMCP gateway or to Opaque — can confirm that:

- The kill switch fired in this session (`kill_switch_triggered: true`)
- The policy that caused the denies is recorded by hash in `trace.policy.bundle_hash`
- The audit chain tip in `trace.tool_transcript.hash` covers all deny decisions
- The TEE measurement in `trace.runtime.measurement` confirms the unmodified workload produced the claim

---

## Unblock an agent identity

The kill switch is a process-lifetime block — it persists as long as the gateway process is running. To unblock, restart the gateway. This clears all in-memory state including the blocked identity set and the rolling window.

For a manual operator override without restart, cMCP exposes an operator endpoint (requires `CMCP_BEARER_TOKEN`):

```bash
curl -X DELETE https://localhost:8443/admin/kill-switch/spiffe%3A%2F%2Fexample.com%2Fagent%2Fprocurement-bot \
-H "Authorization: Bearer $CMCP_BEARER_TOKEN"
```

This calls `KillSwitchEvaluator.unblock()` — clearing the block flag and all rolling window events for that identity. The action is logged to the audit chain.

---

## What counts as a deny

Both `deny` and `advisory_deny` policy decisions count toward the deny rate. A `fault` (tool error) does not count — it indicates a tool-side failure, not a policy enforcement event.

| Decision | Counted as deny? |
|---|---|
| `allow` | No |
| `deny` | Yes |
| `advisory_deny` | Yes |
| `fault` | No |
| `redact` | No |

---

## Sovereign context

For UAE federal ministries and other sovereign deployments, `kill_switch_triggered: true` in a TRACE claim is the answer to "what happens when an agent goes rogue." The proof is hardware-rooted:

- The TEE signs the claim — the cloud operator and the ministry IT team cannot produce this artifact for a different outcome
- The audit chain entry records the agent identity, the deny rate window, and the trigger timestamp
- The claim is verifiable offline by the federal oversight body without calling back to any Opaque service

This closes the regulatory gap that a log file cannot close: a log entry is something the operator controls. A TEE-signed TRACE claim with `kill_switch_triggered: true` is not.

---

## Summary

You configured the rolling-window kill switch, ran a session that tripped the threshold, and verified that the closing TRACE claim carries `gateway.kill_switch_triggered: true`. Subsequent sessions from the flagged agent identity are rejected with `KILL_SWITCH_TRIPPED (403)`. The hardware-signed artifact is verifiable by any regulator offline.

Related tutorials: [TEE attestation](./tee-attestation.md) — hardware-backing the TRACE claim that carries `kill_switch_triggered`. [Verify a TRACE claim](./verifying-a-trace-claim.md) — checking `kill_switch_triggered` as part of offline verification.
1 change: 1 addition & 0 deletions mkdocs.yml
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ nav:
- TEE attestation: docs/tutorials/tee-attestation.md
- Multi-tenant deployment: docs/tutorials/multi-tenant-config.md
- Response inspection: docs/tutorials/response-inspection.md
- AGT SRE kill switch: docs/tutorials/kill-switch.md
- Specification:
- Overview: docs/SPEC.md
- Component Model: docs/spec/component-model.md
Expand Down
4 changes: 4 additions & 0 deletions schemas/trace-claim.schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,10 @@
"catalog_exceptions": {
"type": "array",
"items": { "type": "object" }
},
"kill_switch_triggered": {
"type": "boolean",
"description": "True when the AGT SRE kill switch fired for this session's agent identity. Future sessions from the same identity will be rejected."
}
}
},
Expand Down
3 changes: 3 additions & 0 deletions src/cmcp_runtime/audit/trace_claim.py
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,7 @@ class GatewayAddenda(BaseModel):
catalog_exceptions: list[dict[str, str]] = Field(default_factory=list)
call_log_summary: CallLogSummary | None = None
agent_identity: AgentIdentityOut | None = None
kill_switch_triggered: bool = False


class RuntimeClaim(BaseModel):
Expand Down Expand Up @@ -354,6 +355,7 @@ def generate_trace_claim(
agent_identity: AgentIdentityInfo | None = None,
sequence_number: int = 1,
prev_claim_hash: str | None = None,
kill_switch_triggered: bool = False,
do_sign: bool = True,
) -> RuntimeClaim:
"""Generate a RuntimeClaim from session data, validate it via Pydantic, and optionally sign it.
Expand Down Expand Up @@ -414,6 +416,7 @@ def generate_trace_claim(
attestation_validity_seconds=attestation_report.attestation_validity_seconds,
attestation_stale=attestation_stale,
catalog_exceptions=catalog_exceptions or [],
kill_switch_triggered=kill_switch_triggered,
call_log_summary=call_log_summary,
agent_identity=(
AgentIdentityOut(
Expand Down
45 changes: 45 additions & 0 deletions src/cmcp_runtime/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,14 @@ class StalenessPolicy(StrEnum):
WARN_ONLY = "warn_only"


@dataclass
class KillSwitchConfig:
enabled: bool = False
window_seconds: int = 300
deny_rate_threshold: float = 0.9
min_calls: int = 10


@dataclass
class AttestationConfig:
provider: TEEProvider = TEEProvider.AUTO
Expand All @@ -58,6 +66,7 @@ class AgentManifestConfig:
class Config:
attestation: AttestationConfig = field(default_factory=AttestationConfig)
agent_manifest: AgentManifestConfig = field(default_factory=AgentManifestConfig)
kill_switch: KillSwitchConfig = field(default_factory=KillSwitchConfig)
policy_bundle_path: str = "policy/"
catalog_path: str = "catalog.json"
listen_addr: str = "0.0.0.0:8443"
Expand All @@ -71,13 +80,20 @@ class Config:
_KNOWN_TOP_KEYS = {
"attestation",
"agent_manifest",
"kill_switch",
"policy_bundle_path",
"catalog_path",
"listen_addr",
"max_response_size_bytes",
"policy_reload_interval_seconds",
"audit_db_path",
}
_KNOWN_KILL_SWITCH_KEYS = {
"enabled",
"window_seconds",
"deny_rate_threshold",
"min_calls",
}
_KNOWN_ATTEST_KEYS = {
"provider",
"enforcement_mode",
Expand Down Expand Up @@ -145,6 +161,29 @@ def load_config(path: str) -> Config:
f"'{key}'. Valid keys: {sorted(_KNOWN_AGENT_MANIFEST_KEYS)}"
)

ks_raw = raw.get("kill_switch", {})
if ks_raw is None:
ks_raw = {}
if not isinstance(ks_raw, dict):
raise ConfigError("'kill_switch' must be a mapping")
for key in ks_raw:
if key not in _KNOWN_KILL_SWITCH_KEYS:
raise ConfigError(
f"Unknown kill_switch key '{key}'. Valid keys: {sorted(_KNOWN_KILL_SWITCH_KEYS)}"
)
ks_enabled = ks_raw.get("enabled", False)
if not isinstance(ks_enabled, bool):
raise ConfigError("kill_switch.enabled must be a boolean")
ks_window = ks_raw.get("window_seconds", 300)
if not isinstance(ks_window, int) or ks_window <= 0:
raise ConfigError("kill_switch.window_seconds must be a positive integer")
ks_threshold = ks_raw.get("deny_rate_threshold", 0.9)
if not isinstance(ks_threshold, int | float) or not (0.0 < ks_threshold <= 1.0):
raise ConfigError("kill_switch.deny_rate_threshold must be a float in (0, 1]")
ks_min_calls = ks_raw.get("min_calls", 10)
if not isinstance(ks_min_calls, int) or ks_min_calls <= 0:
raise ConfigError("kill_switch.min_calls must be a positive integer")

try:
provider = TEEProvider(attest_raw.get("provider", "auto"))
except ValueError as err:
Expand Down Expand Up @@ -222,6 +261,12 @@ def load_config(path: str) -> Config:
trust_anchor_path=trust_anchor_path,
authenticated_subject=authenticated_subject,
),
kill_switch=KillSwitchConfig(
enabled=ks_enabled,
window_seconds=ks_window,
deny_rate_threshold=float(ks_threshold),
min_calls=ks_min_calls,
),
policy_bundle_path=policy_bundle_path,
catalog_path=catalog_path,
listen_addr=raw.get("listen_addr", "0.0.0.0:8443"),
Expand Down
7 changes: 7 additions & 0 deletions src/cmcp_runtime/errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,3 +126,10 @@ class ConfigError(CMCPError):
class ClaimValidationError(CMCPError):
code = "CLAIM_VALIDATION_ERROR"
http_status = 500


class KillSwitchTripped(CMCPError):
"""Raised when a new session is rejected because the agent identity has tripped the kill switch."""

code = "KILL_SWITCH_TRIPPED"
http_status = 403
74 changes: 74 additions & 0 deletions src/cmcp_runtime/kill_switch.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
"""AGT SRE kill switch evaluator — implements issue #341."""

from __future__ import annotations

import time
from collections import defaultdict, deque
from typing import TYPE_CHECKING

if TYPE_CHECKING:
from cmcp_runtime.config import KillSwitchConfig


class KillSwitchEvaluator:
"""Rolling-window deny-rate evaluator for per-agent-identity enforcement.

When a registered agent identity exceeds `deny_rate_threshold` policy
denies over the rolling `window_seconds` window (with at least `min_calls`
events), the identity is flagged. The TRACE claim for the session that
trips the threshold carries `kill_switch_triggered=true` — hardware-attested
evidence of automated enforcement. Subsequent `create_session()` calls for
the same agent identity raise `KillSwitchTripped`.

Thread-safety: this evaluator is not thread-safe by itself. The caller
(SessionManager) must serialise calls if sessions are closed concurrently.
In practice the gateway processes sessions on an asyncio event loop and
close_session() is called synchronously, so no lock is needed.
"""

def __init__(self, config: KillSwitchConfig) -> None:
self._config = config
# agent_id -> deque of (monotonic_time, is_deny: bool)
self._events: dict[str, deque[tuple[float, bool]]] = defaultdict(deque)
self._blocked: set[str] = set()

def record_calls(self, agent_id: str, *, allowed: int, denied: int) -> None:
"""Record call outcomes from a just-closed session into the rolling window."""
now = time.monotonic()
q = self._events[agent_id]
for _ in range(allowed):
q.append((now, False))
for _ in range(denied):
q.append((now, True))
self._prune(agent_id)

def evaluate(self, agent_id: str) -> bool:
"""Return True and flag the agent if the kill switch threshold is exceeded."""
if not self._config.enabled:
return False
self._prune(agent_id)
q = self._events[agent_id]
total = len(q)
if total < self._config.min_calls:
return False
deny_count = sum(1 for _, is_deny in q if is_deny)
rate = deny_count / total
if rate >= self._config.deny_rate_threshold:
self._blocked.add(agent_id)
return True
return False

def is_blocked(self, agent_id: str) -> bool:
"""Return True if this agent identity has previously tripped the kill switch."""
return agent_id in self._blocked

def unblock(self, agent_id: str) -> None:
"""Manually unblock an agent identity. Clears its event history too."""
self._blocked.discard(agent_id)
self._events.pop(agent_id, None)

def _prune(self, agent_id: str) -> None:
cutoff = time.monotonic() - self._config.window_seconds
q = self._events[agent_id]
while q and q[0][0] < cutoff:
q.popleft()
Loading