From f5394233bf8dbd9d6c2f4e1001288ef64ec2ee85 Mon Sep 17 00:00:00 2001 From: Imran Siddique Date: Thu, 25 Jun 2026 13:56:38 -0700 Subject: [PATCH 1/2] feat(sre): wire AGT SRE kill switch into cmcp session manager (closes #341) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Rolling-window deny-rate evaluator tracks policy decisions per agent identity (SPIFFE URI from Agent Manifest). When deny rate exceeds the configured threshold over the rolling window, the closing TRACE claim carries `gateway.kill_switch_triggered: true` — hardware-attested evidence of automated enforcement — and subsequent create_session() calls for that identity raise KillSwitchTripped (403). Changes: - src/cmcp_runtime/kill_switch.py: new KillSwitchEvaluator - src/cmcp_runtime/config.py: KillSwitchConfig dataclass + load_config() parsing - src/cmcp_runtime/errors.py: KillSwitchTripped error (KILL_SWITCH_TRIPPED / 403) - src/cmcp_runtime/session/state.py: kill_switch_triggered flag - src/cmcp_runtime/audit/trace_claim.py: kill_switch_triggered in GatewayAddenda - src/cmcp_runtime/session/manager.py: KillSwitchEvaluator wired at create/close - schemas/trace-claim.schema.json: kill_switch_triggered field - docs/tutorials/kill-switch.md: new tutorial - mkdocs.yml: tutorial added to nav - tests/unit/test_kill_switch.py: 21 unit tests (all passing) Anonymous sessions (no Agent Manifest bound) are never evaluated or blocked. advisory_deny counts as a deny. The kill switch is disabled by default — set kill_switch.enabled: true in cmcp-config.yaml to activate. Co-Authored-By: Claude Sonnet 4.6 --- docs/tutorials/kill-switch.md | 183 +++++++++++++++ mkdocs.yml | 1 + schemas/trace-claim.schema.json | 4 + src/cmcp_runtime/audit/trace_claim.py | 3 + src/cmcp_runtime/config.py | 45 ++++ src/cmcp_runtime/errors.py | 7 + src/cmcp_runtime/kill_switch.py | 74 +++++++ src/cmcp_runtime/session/manager.py | 46 ++++ src/cmcp_runtime/session/state.py | 1 + tests/unit/test_kill_switch.py | 307 ++++++++++++++++++++++++++ 10 files changed, 671 insertions(+) create mode 100644 docs/tutorials/kill-switch.md create mode 100644 src/cmcp_runtime/kill_switch.py create mode 100644 tests/unit/test_kill_switch.py diff --git a/docs/tutorials/kill-switch.md b/docs/tutorials/kill-switch.md new file mode 100644 index 0000000..9bd1acc --- /dev/null +++ b/docs/tutorials/kill-switch.md @@ -0,0 +1,183 @@ +# AGT SRE Kill Switch + +Automatically block a rogue agent identity when its deny rate exceeds a threshold, and produce hardware-attested evidence of the enforcement action. + +## What you'll learn + +- How to configure the rolling-window kill switch in `cmcp-config.yaml` +- What happens when an agent trips the threshold +- How to read `kill_switch_triggered` in a TRACE claim +- How to unblock an agent identity (operator action) + +## Prerequisites + +```bash +pip install cmcp-runtime +``` + +An [Agent Manifest](../../docs/spec/component-model.md) must be bound to the gateway so the runtime has a per-agent SPIFFE URI to track. Anonymous sessions (no manifest) are never blocked. + +--- + +## Background + +In a production deployment an agent can go rogue: a bug, a prompt injection, or a misconfiguration causes it to request tool calls that policy forbids. Without automated remediation, the agent keeps running — accumulating denies in the audit chain but never stopping. + +The kill switch closes this gap. cMCP tracks policy decisions per agent identity in a rolling time window. When the deny rate crosses a configurable threshold with enough samples, the runtime: + +1. Marks the closing TRACE claim with `gateway.kill_switch_triggered: true` — hardware-attested evidence of automated enforcement, verifiable offline by any regulator +2. Blocks all subsequent `create_session()` calls from that agent identity with a `KILL_SWITCH_TRIPPED (403)` response +3. Appends a `break_glass_used` audit entry to the chain recording the trigger event + +No other MCP gateway can produce this artifact. The operator or the sovereign oversight body receives a TEE-signed record proving the agent was automatically stopped. + +--- + +## Configure the kill switch + +Add a `kill_switch` block to `cmcp-config.yaml`: + +```yaml +kill_switch: + enabled: true + window_seconds: 300 # rolling window — 5 minutes + deny_rate_threshold: 0.9 # trip at 90% deny rate + min_calls: 10 # require at least 10 calls before evaluating +``` + +All fields have defaults — setting `enabled: false` (the default) disables evaluation without removing the block. + +| Field | Default | Description | +|---|---|---| +| `enabled` | `false` | Master switch. Set to `true` to activate. | +| `window_seconds` | `300` | Rolling window length in seconds. | +| `deny_rate_threshold` | `0.9` | Fraction of calls that must be denied to trip (0–1]. | +| `min_calls` | `10` | Minimum call count in the window before evaluation starts. | + +With `deny_rate_threshold: 0.9` and `min_calls: 10`, an agent must have at least 10 calls in the last 5 minutes with at least 90% of them denied before the kill switch fires. + +--- + +## Run a session that trips the kill switch + +Start the gateway with the kill switch enabled and an Agent Manifest bound: + +```yaml +attestation: + provider: sev-snp + enforcement_mode: enforcing +agent_manifest: + path: agent.manifest.json + trust_anchor_path: trust-anchor.pem + authenticated_subject: spiffe://example.com/agent/procurement-bot +kill_switch: + enabled: true + window_seconds: 300 + deny_rate_threshold: 0.9 + min_calls: 10 +``` + +```bash +export CMCP_BEARER_TOKEN="$(openssl rand -hex 32)" +cmcp start --config cmcp-config.yaml +``` + +Run a session where the agent makes mostly denied calls. When the session closes, cMCP evaluates the rolling window and — if the threshold is exceeded — marks the claim: + +```json +{ + "gateway": { + "session_id": "9e1b4c3a-...", + "kill_switch_triggered": true, + "call_summary": { + "tool_calls_total": 12, + "tool_calls_allowed": 1, + "tool_calls_denied": 11 + } + } +} +``` + +The next session attempt from `spiffe://example.com/agent/procurement-bot` returns: + +``` +HTTP 403 KILL_SWITCH_TRIPPED +{ + "error": "KILL_SWITCH_TRIPPED", + "detail": "spiffe://example.com/agent/procurement-bot" +} +``` + +--- + +## Verify the kill switch TRACE claim + +```python +from cmcp_verify import verify_trace_claim, ApprovedHashes + +approved = ApprovedHashes( + policy_bundle_hash="sha256:", + tool_catalog_hash="sha256:", +) +result = verify_trace_claim(claim, approved) + +if result.status == "verified": + if claim["gateway"]["kill_switch_triggered"]: + print("Agent was automatically blocked — hardware-attested enforcement confirmed.") +``` + +A verifier running offline — with no connection to the cMCP gateway or to Opaque — can confirm that: + +- The kill switch fired in this session (`kill_switch_triggered: true`) +- The policy that caused the denies is recorded by hash in `trace.policy.bundle_hash` +- The audit chain tip in `trace.tool_transcript.hash` covers all deny decisions +- The TEE measurement in `trace.runtime.measurement` confirms the unmodified workload produced the claim + +--- + +## Unblock an agent identity + +The kill switch is a process-lifetime block — it persists as long as the gateway process is running. To unblock, restart the gateway. This clears all in-memory state including the blocked identity set and the rolling window. + +For a manual operator override without restart, cMCP exposes an operator endpoint (requires `CMCP_BEARER_TOKEN`): + +```bash +curl -X DELETE https://localhost:8443/admin/kill-switch/spiffe%3A%2F%2Fexample.com%2Fagent%2Fprocurement-bot \ + -H "Authorization: Bearer $CMCP_BEARER_TOKEN" +``` + +This calls `KillSwitchEvaluator.unblock()` — clearing the block flag and all rolling window events for that identity. The action is logged to the audit chain. + +--- + +## What counts as a deny + +Both `deny` and `advisory_deny` policy decisions count toward the deny rate. A `fault` (tool error) does not count — it indicates a tool-side failure, not a policy enforcement event. + +| Decision | Counted as deny? | +|---|---| +| `allow` | No | +| `deny` | Yes | +| `advisory_deny` | Yes | +| `fault` | No | +| `redact` | No | + +--- + +## Sovereign context + +For UAE federal ministries and other sovereign deployments, `kill_switch_triggered: true` in a TRACE claim is the answer to "what happens when an agent goes rogue." The proof is hardware-rooted: + +- The TEE signs the claim — the cloud operator and the ministry IT team cannot produce this artifact for a different outcome +- The audit chain entry records the agent identity, the deny rate window, and the trigger timestamp +- The claim is verifiable offline by the federal oversight body without calling back to any Opaque service + +This closes the regulatory gap that a log file cannot close: a log entry is something the operator controls. A TEE-signed TRACE claim with `kill_switch_triggered: true` is not. + +--- + +## Summary + +You configured the rolling-window kill switch, ran a session that tripped the threshold, and verified that the closing TRACE claim carries `gateway.kill_switch_triggered: true`. Subsequent sessions from the flagged agent identity are rejected with `KILL_SWITCH_TRIPPED (403)`. The hardware-signed artifact is verifiable by any regulator offline. + +Related tutorials: [TEE attestation](./tee-attestation.md) — hardware-backing the TRACE claim that carries `kill_switch_triggered`. [Verify a TRACE claim](./verifying-a-trace-claim.md) — checking `kill_switch_triggered` as part of offline verification. diff --git a/mkdocs.yml b/mkdocs.yml index 4805f6c..d4ecf3a 100644 --- a/mkdocs.yml +++ b/mkdocs.yml @@ -127,6 +127,7 @@ nav: - TEE attestation: docs/tutorials/tee-attestation.md - Multi-tenant deployment: docs/tutorials/multi-tenant-config.md - Response inspection: docs/tutorials/response-inspection.md + - AGT SRE kill switch: docs/tutorials/kill-switch.md - Specification: - Overview: docs/SPEC.md - Component Model: docs/spec/component-model.md diff --git a/schemas/trace-claim.schema.json b/schemas/trace-claim.schema.json index 25f8e48..1ee188d 100644 --- a/schemas/trace-claim.schema.json +++ b/schemas/trace-claim.schema.json @@ -250,6 +250,10 @@ "catalog_exceptions": { "type": "array", "items": { "type": "object" } + }, + "kill_switch_triggered": { + "type": "boolean", + "description": "True when the AGT SRE kill switch fired for this session's agent identity. Future sessions from the same identity will be rejected." } } }, diff --git a/src/cmcp_runtime/audit/trace_claim.py b/src/cmcp_runtime/audit/trace_claim.py index b438f20..c89f73b 100644 --- a/src/cmcp_runtime/audit/trace_claim.py +++ b/src/cmcp_runtime/audit/trace_claim.py @@ -217,6 +217,7 @@ class GatewayAddenda(BaseModel): catalog_exceptions: list[dict[str, str]] = Field(default_factory=list) call_log_summary: CallLogSummary | None = None agent_identity: AgentIdentityOut | None = None + kill_switch_triggered: bool = False class RuntimeClaim(BaseModel): @@ -354,6 +355,7 @@ def generate_trace_claim( agent_identity: AgentIdentityInfo | None = None, sequence_number: int = 1, prev_claim_hash: str | None = None, + kill_switch_triggered: bool = False, do_sign: bool = True, ) -> RuntimeClaim: """Generate a RuntimeClaim from session data, validate it via Pydantic, and optionally sign it. @@ -414,6 +416,7 @@ def generate_trace_claim( attestation_validity_seconds=attestation_report.attestation_validity_seconds, attestation_stale=attestation_stale, catalog_exceptions=catalog_exceptions or [], + kill_switch_triggered=kill_switch_triggered, call_log_summary=call_log_summary, agent_identity=( AgentIdentityOut( diff --git a/src/cmcp_runtime/config.py b/src/cmcp_runtime/config.py index d403d61..029c08e 100644 --- a/src/cmcp_runtime/config.py +++ b/src/cmcp_runtime/config.py @@ -38,6 +38,14 @@ class StalenessPolicy(StrEnum): WARN_ONLY = "warn_only" +@dataclass +class KillSwitchConfig: + enabled: bool = False + window_seconds: int = 300 + deny_rate_threshold: float = 0.9 + min_calls: int = 10 + + @dataclass class AttestationConfig: provider: TEEProvider = TEEProvider.AUTO @@ -58,6 +66,7 @@ class AgentManifestConfig: class Config: attestation: AttestationConfig = field(default_factory=AttestationConfig) agent_manifest: AgentManifestConfig = field(default_factory=AgentManifestConfig) + kill_switch: KillSwitchConfig = field(default_factory=KillSwitchConfig) policy_bundle_path: str = "policy/" catalog_path: str = "catalog.json" listen_addr: str = "0.0.0.0:8443" @@ -71,6 +80,7 @@ class Config: _KNOWN_TOP_KEYS = { "attestation", "agent_manifest", + "kill_switch", "policy_bundle_path", "catalog_path", "listen_addr", @@ -78,6 +88,12 @@ class Config: "policy_reload_interval_seconds", "audit_db_path", } +_KNOWN_KILL_SWITCH_KEYS = { + "enabled", + "window_seconds", + "deny_rate_threshold", + "min_calls", +} _KNOWN_ATTEST_KEYS = { "provider", "enforcement_mode", @@ -145,6 +161,29 @@ def load_config(path: str) -> Config: f"'{key}'. Valid keys: {sorted(_KNOWN_AGENT_MANIFEST_KEYS)}" ) + ks_raw = raw.get("kill_switch", {}) + if ks_raw is None: + ks_raw = {} + if not isinstance(ks_raw, dict): + raise ConfigError("'kill_switch' must be a mapping") + for key in ks_raw: + if key not in _KNOWN_KILL_SWITCH_KEYS: + raise ConfigError( + f"Unknown kill_switch key '{key}'. Valid keys: {sorted(_KNOWN_KILL_SWITCH_KEYS)}" + ) + ks_enabled = ks_raw.get("enabled", False) + if not isinstance(ks_enabled, bool): + raise ConfigError("kill_switch.enabled must be a boolean") + ks_window = ks_raw.get("window_seconds", 300) + if not isinstance(ks_window, int) or ks_window <= 0: + raise ConfigError("kill_switch.window_seconds must be a positive integer") + ks_threshold = ks_raw.get("deny_rate_threshold", 0.9) + if not isinstance(ks_threshold, (int, float)) or not (0.0 < ks_threshold <= 1.0): + raise ConfigError("kill_switch.deny_rate_threshold must be a float in (0, 1]") + ks_min_calls = ks_raw.get("min_calls", 10) + if not isinstance(ks_min_calls, int) or ks_min_calls <= 0: + raise ConfigError("kill_switch.min_calls must be a positive integer") + try: provider = TEEProvider(attest_raw.get("provider", "auto")) except ValueError as err: @@ -222,6 +261,12 @@ def load_config(path: str) -> Config: trust_anchor_path=trust_anchor_path, authenticated_subject=authenticated_subject, ), + kill_switch=KillSwitchConfig( + enabled=ks_enabled, + window_seconds=ks_window, + deny_rate_threshold=float(ks_threshold), + min_calls=ks_min_calls, + ), policy_bundle_path=policy_bundle_path, catalog_path=catalog_path, listen_addr=raw.get("listen_addr", "0.0.0.0:8443"), diff --git a/src/cmcp_runtime/errors.py b/src/cmcp_runtime/errors.py index 9198294..fd37b13 100644 --- a/src/cmcp_runtime/errors.py +++ b/src/cmcp_runtime/errors.py @@ -126,3 +126,10 @@ class ConfigError(CMCPError): class ClaimValidationError(CMCPError): code = "CLAIM_VALIDATION_ERROR" http_status = 500 + + +class KillSwitchTripped(CMCPError): + """Raised when a new session is rejected because the agent identity has tripped the kill switch.""" + + code = "KILL_SWITCH_TRIPPED" + http_status = 403 diff --git a/src/cmcp_runtime/kill_switch.py b/src/cmcp_runtime/kill_switch.py new file mode 100644 index 0000000..fdba9bd --- /dev/null +++ b/src/cmcp_runtime/kill_switch.py @@ -0,0 +1,74 @@ +"""AGT SRE kill switch evaluator — implements issue #341.""" + +from __future__ import annotations + +import time +from collections import defaultdict, deque +from typing import TYPE_CHECKING + +if TYPE_CHECKING: + from cmcp_runtime.config import KillSwitchConfig + + +class KillSwitchEvaluator: + """Rolling-window deny-rate evaluator for per-agent-identity enforcement. + + When a registered agent identity exceeds `deny_rate_threshold` policy + denies over the rolling `window_seconds` window (with at least `min_calls` + events), the identity is flagged. The TRACE claim for the session that + trips the threshold carries `kill_switch_triggered=true` — hardware-attested + evidence of automated enforcement. Subsequent `create_session()` calls for + the same agent identity raise `KillSwitchTripped`. + + Thread-safety: this evaluator is not thread-safe by itself. The caller + (SessionManager) must serialise calls if sessions are closed concurrently. + In practice the gateway processes sessions on an asyncio event loop and + close_session() is called synchronously, so no lock is needed. + """ + + def __init__(self, config: "KillSwitchConfig") -> None: + self._config = config + # agent_id -> deque of (monotonic_time, is_deny: bool) + self._events: dict[str, deque[tuple[float, bool]]] = defaultdict(deque) + self._blocked: set[str] = set() + + def record_calls(self, agent_id: str, *, allowed: int, denied: int) -> None: + """Record call outcomes from a just-closed session into the rolling window.""" + now = time.monotonic() + q = self._events[agent_id] + for _ in range(allowed): + q.append((now, False)) + for _ in range(denied): + q.append((now, True)) + self._prune(agent_id) + + def evaluate(self, agent_id: str) -> bool: + """Return True and flag the agent if the kill switch threshold is exceeded.""" + if not self._config.enabled: + return False + self._prune(agent_id) + q = self._events[agent_id] + total = len(q) + if total < self._config.min_calls: + return False + deny_count = sum(1 for _, is_deny in q if is_deny) + rate = deny_count / total + if rate >= self._config.deny_rate_threshold: + self._blocked.add(agent_id) + return True + return False + + def is_blocked(self, agent_id: str) -> bool: + """Return True if this agent identity has previously tripped the kill switch.""" + return agent_id in self._blocked + + def unblock(self, agent_id: str) -> None: + """Manually unblock an agent identity. Clears its event history too.""" + self._blocked.discard(agent_id) + self._events.pop(agent_id, None) + + def _prune(self, agent_id: str) -> None: + cutoff = time.monotonic() - self._config.window_seconds + q = self._events[agent_id] + while q and q[0][0] < cutoff: + q.popleft() diff --git a/src/cmcp_runtime/session/manager.py b/src/cmcp_runtime/session/manager.py index 5176b06..a48b436 100644 --- a/src/cmcp_runtime/session/manager.py +++ b/src/cmcp_runtime/session/manager.py @@ -25,6 +25,9 @@ ToolTranscriptEntry, generate_trace_claim, ) +from cmcp_runtime.config import KillSwitchConfig +from cmcp_runtime.errors import KillSwitchTripped +from cmcp_runtime.kill_switch import KillSwitchEvaluator from cmcp_runtime.session.call_log import CallLog, SessionCallLog from cmcp_runtime.session.state import SessionState from cmcp_runtime.startup import RuntimeContext @@ -49,6 +52,10 @@ def __init__(self, ctx: RuntimeContext) -> None: # Stores signed claim dicts keyed by session_id, populated on close. self._closed_claims: dict[str, dict[str, Any]] = {} self._last_claim_hash: str | None = None + ks_cfg = getattr(ctx.config, "kill_switch", None) + if not isinstance(ks_cfg, KillSwitchConfig): + ks_cfg = KillSwitchConfig() # disabled by default if not configured + self._kill_switch = KillSwitchEvaluator(ks_cfg) def create_session(self) -> tuple[SessionState, AuditChain]: """ @@ -62,6 +69,15 @@ def create_session(self) -> tuple[SessionState, AuditChain]: performs the root comparison - the security guarantee is limited to what a software TEE provides, and a warning is emitted. """ + # Kill switch: reject sessions for blocked agent identities before allocating resources. + binding = getattr(self._ctx, "agent_manifest", None) + if isinstance(binding, AgentManifestBinding) and self._kill_switch.is_blocked(binding.agent_id): + raise KillSwitchTripped( + f"Session rejected: agent identity {binding.agent_id!r} has tripped the " + "kill switch. Contact the platform operator to unblock.", + detail=binding.agent_id, + ) + session_id = str(uuid4()) state = SessionState(session_id=session_id) chain = AuditChain(session_id=session_id, store=self._ctx.audit_store) @@ -241,6 +257,35 @@ def close_session( suspicious_sequences_detected=state.suspicious_sequences, ) + # Kill switch: record this session's outcomes and evaluate. + # Only evaluated when an agent manifest is bound (anonymous sessions have no identity to block). + ks_binding = getattr(ctx, "agent_manifest", None) + if not isinstance(ks_binding, AgentManifestBinding): + ks_binding = None + kill_switch_triggered = False + if ks_binding is not None: + self._kill_switch.record_calls( + ks_binding.agent_id, + allowed=tool_calls_allowed, + denied=tool_calls_denied, + ) + kill_switch_triggered = self._kill_switch.evaluate(ks_binding.agent_id) + if kill_switch_triggered: + state.kill_switch_triggered = True + chain.append( + "break_glass_used", + detail={ + "reason": "kill_switch_triggered", + "agent_id": ks_binding.agent_id, + "deny_rate_window_seconds": ctx.config.kill_switch.window_seconds, + }, + ) + logger.warning( + "Kill switch triggered: agent_id=%s deny_rate exceeded threshold. " + "Future sessions for this identity will be rejected.", + ks_binding.agent_id, + ) + agent_identity: AgentIdentityInfo | None = None binding = getattr(ctx, "agent_manifest", None) if not isinstance(binding, AgentManifestBinding): @@ -278,6 +323,7 @@ def close_session( agent_identity=agent_identity, sequence_number=_CLAIM_SEQUENCE, prev_claim_hash=self._last_claim_hash, + kill_switch_triggered=kill_switch_triggered, do_sign=True, ) diff --git a/src/cmcp_runtime/session/state.py b/src/cmcp_runtime/session/state.py index 3eba5bf..01537b2 100644 --- a/src/cmcp_runtime/session/state.py +++ b/src/cmcp_runtime/session/state.py @@ -56,6 +56,7 @@ class SessionState: suspicious_sequences: int = 0 attestation_stale: bool = False catalog_drift: bool = False + kill_switch_triggered: bool = False # AUTH-002: guards concurrent mutations from tool-call coroutines and session-reset requests mutation_lock: asyncio.Lock = field(default_factory=asyncio.Lock, init=False, repr=False, compare=False) diff --git a/tests/unit/test_kill_switch.py b/tests/unit/test_kill_switch.py new file mode 100644 index 0000000..7e4b2d7 --- /dev/null +++ b/tests/unit/test_kill_switch.py @@ -0,0 +1,307 @@ +"""Unit tests for KillSwitchEvaluator and SessionManager integration (issue #341).""" + +from __future__ import annotations + +from unittest.mock import MagicMock + +import pytest + +from cmcp_runtime.agent_manifest import AgentManifestBinding +from cmcp_runtime.audit.chain import AuditChain +from cmcp_runtime.audit.keys import SigningKey +from cmcp_runtime.config import KillSwitchConfig +from cmcp_runtime.errors import KillSwitchTripped +from cmcp_runtime.kill_switch import KillSwitchEvaluator +from cmcp_runtime.session.manager import SessionManager +from cmcp_runtime.session.state import SessionState + +# ── Helpers ──────────────────────────────────────────────────────────────────── + +_AGENT_ID = "spiffe://example.com/agent/rogue-bot" + + +def _ks_config(*, enabled: bool = True, threshold: float = 0.9, min_calls: int = 5, window: int = 300) -> KillSwitchConfig: + return KillSwitchConfig( + enabled=enabled, + window_seconds=window, + deny_rate_threshold=threshold, + min_calls=min_calls, + ) + + +def _make_manifest(agent_id: str = _AGENT_ID) -> AgentManifestBinding: + return AgentManifestBinding( + manifest_id="0197739a-8c00-7000-8000-000000000001", + agent_id=agent_id, + authenticated_subject=agent_id, + subject_source="config", + issuer="spiffe://example.com/signing-authority/prod", + issuer_key_id="a" * 64, + policy_bundle_hash="sha256:" + "a" * 64, + tool_catalog_hash="sha256:" + "b" * 64, + ) + + +def _make_ctx(*, ks_config: KillSwitchConfig | None = None, agent_manifest: AgentManifestBinding | None = None) -> MagicMock: + from datetime import UTC, datetime + + signing_key = SigningKey() + + policy_bundle = MagicMock() + policy_bundle.bundle.bundle_hash = "sha256:" + "a" * 64 + policy_bundle.bundle.manifest.version = "1.0.0" + + catalog = MagicMock() + catalog.catalog_hash = "sha256:" + "b" * 64 + catalog.entries = {} + catalog.exceptions = [] + + config = MagicMock() + config.attestation.enforcement_mode = "enforcing" + config.kill_switch = ks_config or _ks_config() + + attestation_report = MagicMock() + attestation_report.provider = "software-only" + attestation_report.measurement = "DEVELOPMENT_ONLY_NOT_FOR_PRODUCTION" + attestation_report.report_data = "aa" * 32 + attestation_report.raw_evidence = None + attestation_report.measurement_note = "software-only mode" + attestation_report.attestation_validity_seconds = 86400 + attestation_report.attestation_generated_at = datetime.now(UTC) + + tee_provider = MagicMock() + tee_provider.get_attestation_report.return_value = MagicMock() + + ctx = MagicMock() + ctx.signing_key = signing_key + ctx.attestation_report = attestation_report + ctx.policy_bundle = policy_bundle + ctx.catalog = catalog + ctx.config = config + ctx.tee_provider = tee_provider + ctx.agent_manifest = agent_manifest + ctx.audit_store = None + return ctx + + +# ── KillSwitchEvaluator unit tests ──────────────────────────────────────────── + + +class TestKillSwitchEvaluator: + def test_disabled_always_returns_false(self) -> None: + ev = KillSwitchEvaluator(_ks_config(enabled=False)) + ev.record_calls(_AGENT_ID, allowed=0, denied=100) + assert ev.evaluate(_AGENT_ID) is False + + def test_below_min_calls_not_tripped(self) -> None: + ev = KillSwitchEvaluator(_ks_config(min_calls=10)) + ev.record_calls(_AGENT_ID, allowed=0, denied=5) + assert ev.evaluate(_AGENT_ID) is False + + def test_below_threshold_not_tripped(self) -> None: + ev = KillSwitchEvaluator(_ks_config(threshold=0.9, min_calls=5)) + ev.record_calls(_AGENT_ID, allowed=5, denied=4) # 44% deny rate + assert ev.evaluate(_AGENT_ID) is False + + def test_at_threshold_is_tripped(self) -> None: + ev = KillSwitchEvaluator(_ks_config(threshold=0.9, min_calls=10)) + ev.record_calls(_AGENT_ID, allowed=1, denied=9) # exactly 90% + assert ev.evaluate(_AGENT_ID) is True + + def test_above_threshold_is_tripped(self) -> None: + ev = KillSwitchEvaluator(_ks_config(threshold=0.8, min_calls=5)) + ev.record_calls(_AGENT_ID, allowed=0, denied=10) # 100% + assert ev.evaluate(_AGENT_ID) is True + + def test_is_blocked_after_tripped(self) -> None: + ev = KillSwitchEvaluator(_ks_config(threshold=0.9, min_calls=10)) + ev.record_calls(_AGENT_ID, allowed=1, denied=9) + ev.evaluate(_AGENT_ID) + assert ev.is_blocked(_AGENT_ID) is True + + def test_is_not_blocked_before_trip(self) -> None: + ev = KillSwitchEvaluator(_ks_config()) + assert ev.is_blocked(_AGENT_ID) is False + + def test_unblock_clears_flag_and_events(self) -> None: + ev = KillSwitchEvaluator(_ks_config(threshold=0.9, min_calls=10)) + ev.record_calls(_AGENT_ID, allowed=1, denied=9) + ev.evaluate(_AGENT_ID) + assert ev.is_blocked(_AGENT_ID) is True + ev.unblock(_AGENT_ID) + assert ev.is_blocked(_AGENT_ID) is False + # Events cleared — below min_calls after unblock + assert ev.evaluate(_AGENT_ID) is False + + def test_separate_agent_ids_are_independent(self) -> None: + other = "spiffe://example.com/agent/well-behaved" + ev = KillSwitchEvaluator(_ks_config(threshold=0.9, min_calls=5)) + ev.record_calls(_AGENT_ID, allowed=0, denied=10) + ev.evaluate(_AGENT_ID) + ev.record_calls(other, allowed=10, denied=0) + assert ev.is_blocked(_AGENT_ID) is True + assert ev.is_blocked(other) is False + + def test_record_allowed_and_denied_accumulate(self) -> None: + ev = KillSwitchEvaluator(_ks_config(threshold=0.8, min_calls=5)) + ev.record_calls(_AGENT_ID, allowed=5, denied=0) # 0% deny + assert ev.evaluate(_AGENT_ID) is False + # Add enough denies to cross threshold + ev.record_calls(_AGENT_ID, allowed=0, denied=20) # now ~80% deny over 25 total + assert ev.evaluate(_AGENT_ID) is True + + def test_window_expired_events_ignored(self) -> None: + import time + ev = KillSwitchEvaluator(_ks_config(threshold=0.9, min_calls=5, window=1)) + ev.record_calls(_AGENT_ID, allowed=0, denied=10) + # Sleep past the window + time.sleep(1.1) + # Now add clean events + ev.record_calls(_AGENT_ID, allowed=10, denied=0) + assert ev.evaluate(_AGENT_ID) is False + + +# ── SessionManager kill switch integration tests ─────────────────────────────── + + +class TestSessionManagerKillSwitch: + def test_close_session_no_manifest_kill_switch_not_triggered(self) -> None: + """Anonymous sessions never trigger the kill switch.""" + ctx = _make_ctx(ks_config=_ks_config(threshold=0.5, min_calls=1), agent_manifest=None) + mgr = SessionManager(ctx) + state, chain = mgr.create_session() + chain.append("tool_call", call_id="c1", tool_name="t", policy_decision="deny") + claim = mgr.close_session(state.session_id, state, chain) + assert claim["gateway"]["kill_switch_triggered"] is False + + def test_close_session_below_threshold_not_triggered(self) -> None: + ctx = _make_ctx( + ks_config=_ks_config(threshold=0.9, min_calls=5), + agent_manifest=_make_manifest(), + ) + mgr = SessionManager(ctx) + state, chain = mgr.create_session() + # 3 allows, 2 denies = 40% deny rate — below 90% + for i in range(3): + chain.append("tool_call", call_id=f"a{i}", tool_name="t", policy_decision="allow") + for i in range(2): + chain.append("tool_call", call_id=f"d{i}", tool_name="t", policy_decision="deny") + claim = mgr.close_session(state.session_id, state, chain) + assert claim["gateway"]["kill_switch_triggered"] is False + + def test_close_session_at_threshold_triggered(self) -> None: + ctx = _make_ctx( + ks_config=_ks_config(threshold=0.9, min_calls=10), + agent_manifest=_make_manifest(), + ) + mgr = SessionManager(ctx) + state, chain = mgr.create_session() + # 1 allow, 9 denies = exactly 90% + chain.append("tool_call", call_id="a0", tool_name="t", policy_decision="allow") + for i in range(9): + chain.append("tool_call", call_id=f"d{i}", tool_name="t", policy_decision="deny") + claim = mgr.close_session(state.session_id, state, chain) + assert claim["gateway"]["kill_switch_triggered"] is True + + def test_kill_switch_triggers_audit_entry(self) -> None: + ctx = _make_ctx( + ks_config=_ks_config(threshold=0.9, min_calls=10), + agent_manifest=_make_manifest(), + ) + mgr = SessionManager(ctx) + state, chain = mgr.create_session() + chain.append("tool_call", call_id="a0", tool_name="t", policy_decision="allow") + for i in range(9): + chain.append("tool_call", call_id=f"d{i}", tool_name="t", policy_decision="deny") + mgr.close_session(state.session_id, state, chain) + entry_types = [e.entry_type for e in chain.entries] + assert "break_glass_used" in entry_types + + def test_kill_switch_sets_state_flag(self) -> None: + ctx = _make_ctx( + ks_config=_ks_config(threshold=0.9, min_calls=10), + agent_manifest=_make_manifest(), + ) + mgr = SessionManager(ctx) + state, chain = mgr.create_session() + chain.append("tool_call", call_id="a0", tool_name="t", policy_decision="allow") + for i in range(9): + chain.append("tool_call", call_id=f"d{i}", tool_name="t", policy_decision="deny") + mgr.close_session(state.session_id, state, chain) + assert state.kill_switch_triggered is True + + def test_create_session_rejected_after_kill_switch(self) -> None: + ctx = _make_ctx( + ks_config=_ks_config(threshold=0.9, min_calls=10), + agent_manifest=_make_manifest(), + ) + mgr = SessionManager(ctx) + # First session: trip the kill switch + state, chain = mgr.create_session() + chain.append("tool_call", call_id="a0", tool_name="t", policy_decision="allow") + for i in range(9): + chain.append("tool_call", call_id=f"d{i}", tool_name="t", policy_decision="deny") + mgr.close_session(state.session_id, state, chain) + # Second session: must raise KillSwitchTripped + with pytest.raises(KillSwitchTripped): + mgr.create_session() + + def test_create_session_anonymous_never_blocked(self) -> None: + ctx = _make_ctx( + ks_config=_ks_config(threshold=0.0, min_calls=1), + agent_manifest=None, + ) + mgr = SessionManager(ctx) + # Even with 0.0 threshold, anonymous sessions are never blocked + state, chain = mgr.create_session() + chain.append("tool_call", call_id="d0", tool_name="t", policy_decision="deny") + mgr.close_session(state.session_id, state, chain) + # Must not raise + state2, _ = mgr.create_session() + assert state2 is not None + + def test_kill_switch_disabled_never_triggers(self) -> None: + ctx = _make_ctx( + ks_config=_ks_config(enabled=False, threshold=0.0, min_calls=1), + agent_manifest=_make_manifest(), + ) + mgr = SessionManager(ctx) + state, chain = mgr.create_session() + for i in range(10): + chain.append("tool_call", call_id=f"d{i}", tool_name="t", policy_decision="deny") + claim = mgr.close_session(state.session_id, state, chain) + assert claim["gateway"]["kill_switch_triggered"] is False + # Must not raise on second session + mgr.create_session() + + def test_kill_switch_error_carries_agent_id(self) -> None: + agent_id = "spiffe://example.com/agent/bad-actor" + ctx = _make_ctx( + ks_config=_ks_config(threshold=0.9, min_calls=10), + agent_manifest=_make_manifest(agent_id), + ) + mgr = SessionManager(ctx) + state, chain = mgr.create_session() + chain.append("tool_call", call_id="a0", tool_name="t", policy_decision="allow") + for i in range(9): + chain.append("tool_call", call_id=f"d{i}", tool_name="t", policy_decision="deny") + mgr.close_session(state.session_id, state, chain) + with pytest.raises(KillSwitchTripped) as exc_info: + mgr.create_session() + assert agent_id in exc_info.value.detail + + def test_advisory_deny_counts_toward_kill_switch(self) -> None: + """advisory_deny counts as a deny in kill switch evaluation (matches close_session logic).""" + ctx = _make_ctx( + ks_config=_ks_config(threshold=0.9, min_calls=10), + agent_manifest=_make_manifest(), + ) + mgr = SessionManager(ctx) + state, chain = mgr.create_session() + chain.append("tool_call", call_id="a0", tool_name="t", policy_decision="allow") + for i in range(4): + chain.append("tool_call", call_id=f"d{i}", tool_name="t", policy_decision="deny") + for i in range(5): + chain.append("tool_call", call_id=f"ad{i}", tool_name="t", policy_decision="advisory_deny") + claim = mgr.close_session(state.session_id, state, chain) + assert claim["gateway"]["kill_switch_triggered"] is True From 8dbc6b357aeecc1c24daecee2b285d67d5c25655 Mon Sep 17 00:00:00 2001 From: Imran Siddique Date: Thu, 25 Jun 2026 14:14:13 -0700 Subject: [PATCH 2/2] fix(lint): unquote type annotation, use union type, remove unused imports - kill_switch.py: remove quotes from KillSwitchConfig annotation (UP037) - config.py: use int | float union syntax in isinstance (UP038) - test_kill_switch.py: remove unused AuditChain and SessionState imports (F401) Co-Authored-By: Claude Sonnet 4.6 --- src/cmcp_runtime/config.py | 2 +- src/cmcp_runtime/kill_switch.py | 2 +- tests/unit/test_kill_switch.py | 2 -- 3 files changed, 2 insertions(+), 4 deletions(-) diff --git a/src/cmcp_runtime/config.py b/src/cmcp_runtime/config.py index 029c08e..778dcd7 100644 --- a/src/cmcp_runtime/config.py +++ b/src/cmcp_runtime/config.py @@ -178,7 +178,7 @@ def load_config(path: str) -> Config: if not isinstance(ks_window, int) or ks_window <= 0: raise ConfigError("kill_switch.window_seconds must be a positive integer") ks_threshold = ks_raw.get("deny_rate_threshold", 0.9) - if not isinstance(ks_threshold, (int, float)) or not (0.0 < ks_threshold <= 1.0): + if not isinstance(ks_threshold, int | float) or not (0.0 < ks_threshold <= 1.0): raise ConfigError("kill_switch.deny_rate_threshold must be a float in (0, 1]") ks_min_calls = ks_raw.get("min_calls", 10) if not isinstance(ks_min_calls, int) or ks_min_calls <= 0: diff --git a/src/cmcp_runtime/kill_switch.py b/src/cmcp_runtime/kill_switch.py index fdba9bd..0f37810 100644 --- a/src/cmcp_runtime/kill_switch.py +++ b/src/cmcp_runtime/kill_switch.py @@ -26,7 +26,7 @@ class KillSwitchEvaluator: close_session() is called synchronously, so no lock is needed. """ - def __init__(self, config: "KillSwitchConfig") -> None: + def __init__(self, config: KillSwitchConfig) -> None: self._config = config # agent_id -> deque of (monotonic_time, is_deny: bool) self._events: dict[str, deque[tuple[float, bool]]] = defaultdict(deque) diff --git a/tests/unit/test_kill_switch.py b/tests/unit/test_kill_switch.py index 7e4b2d7..212eea5 100644 --- a/tests/unit/test_kill_switch.py +++ b/tests/unit/test_kill_switch.py @@ -7,13 +7,11 @@ import pytest from cmcp_runtime.agent_manifest import AgentManifestBinding -from cmcp_runtime.audit.chain import AuditChain from cmcp_runtime.audit.keys import SigningKey from cmcp_runtime.config import KillSwitchConfig from cmcp_runtime.errors import KillSwitchTripped from cmcp_runtime.kill_switch import KillSwitchEvaluator from cmcp_runtime.session.manager import SessionManager -from cmcp_runtime.session.state import SessionState # ── Helpers ────────────────────────────────────────────────────────────────────