diff --git a/README.md b/README.md index a6c7d25..6b42144 100644 --- a/README.md +++ b/README.md @@ -162,11 +162,7 @@ from capiscio_mcp.integrations.mcp import CapiscioMCPServer # db is your application's database connection (asyncpg, databases, etc.) db = ... # e.g. databases.Database("postgresql://...") -server = CapiscioMCPServer( - name="data-api", - did="did:web:mcp.example.com:servers:data-api", - badge="eyJhbGc...", # From CapiscIO registry -) +server = CapiscioMCPServer.connect() @server.tool(min_trust_level=2) async def get_user(user_id: int) -> dict: @@ -204,26 +200,23 @@ async with CapiscioMCPClient( print(result) ``` -## MCPServerIdentity.connect() — "Let's Encrypt" Style Setup +## CapiscioMCPServer.connect() — “Let's Encrypt” Style Setup Register your MCP server and get a badge with a single call: ```python -from capiscio_mcp import MCPServerIdentity +from capiscio_mcp.integrations.mcp import CapiscioMCPServer -identity = await MCPServerIdentity.connect( - server_id="550e8400-...", # From the dashboard - api_key="sk_live_...", -) +server = CapiscioMCPServer.connect() -print(identity.did) # did:web:registry.capisc.io:servers:550e8400-... -print(identity.badge) # Current badge JWS (auto-issued) +print(server.did) # did:web:registry.capisc.io:servers:550e8400-... +print(server.badge) # Current badge JWS (auto-issued) ``` ### Using Environment Variables ```python -identity = await MCPServerIdentity.from_env() +server = CapiscioMCPServer.connect() ``` | Variable | Required | Description | @@ -424,6 +417,7 @@ config = VerifyConfig( Requires `pip install capiscio-mcp[mcp]`: +- `CapiscioMCPServer.connect()` — One-liner: load identity from env and create server - `CapiscioMCPServer(name, did, badge, ...)` — FastMCP wrapper with trust enforcement - `CapiscioMCPServer.tool(min_trust_level=...)` — Decorator for guarded tools - `CapiscioMCPServer.run(transport="stdio")` — Run the server diff --git a/capiscio_mcp/__init__.py b/capiscio_mcp/__init__.py index ef96e03..1e53ea4 100644 --- a/capiscio_mcp/__init__.py +++ b/capiscio_mcp/__init__.py @@ -10,7 +10,7 @@ - Server identity registration for MCP servers - PoP (Proof of Possession) handshake for server key verification - Evidence logging for audit and forensics -- One-line server identity setup via MCPServerIdentity.connect() +- One-line server identity setup via CapiscioMCPServer.connect() Installation: pip install capiscio-mcp # Standalone @@ -18,14 +18,9 @@ pip install capiscio-mcp[crypto] # With PoP signing/verification Quickstart ("Let's Encrypt" style — recommended): - from capiscio_mcp import MCPServerIdentity from capiscio_mcp.integrations.mcp import CapiscioMCPServer - identity = await MCPServerIdentity.connect( - server_id=os.environ["CAPISCIO_SERVER_ID"], - api_key=os.environ["CAPISCIO_API_KEY"], - ) - server = CapiscioMCPServer(identity=identity) + server = CapiscioMCPServer.connect() @server.tool(min_trust_level=2) async def read_file(path: str) -> str: @@ -61,6 +56,13 @@ async def read_database(query: str) -> list[dict]: print(f"Server DID: {result['did']}") """ +import os as _os + +# Suppress gRPC C-core stderr noise (ev_poll_posix.cc, fork_posix.cc, etc.) +# before any gRPC import. Library users should not see low-level C-core logs. +_os.environ.setdefault("GRPC_VERBOSITY", "NONE") +_os.environ.setdefault("GRPC_TRACE", "") + from capiscio_mcp.types import ( Decision, AuthLevel, @@ -88,8 +90,6 @@ async def read_database(query: str) -> list[dict]: GuardResult, compute_params_hash, evaluate_tool_access, - set_pip_config, - get_pip_config, ) from capiscio_mcp.server import ( verify_server, @@ -165,8 +165,6 @@ async def read_database(query: str) -> list[dict]: "GuardResult", "compute_params_hash", "evaluate_tool_access", - "set_pip_config", - "get_pip_config", # Policy (RFC-005) "PIPConfig", "PolicyClient", diff --git a/capiscio_mcp/connect.py b/capiscio_mcp/connect.py index 191ba08..a8648f4 100644 --- a/capiscio_mcp/connect.py +++ b/capiscio_mcp/connect.py @@ -46,8 +46,6 @@ async def main(): from capiscio_mcp.keeper import ServerBadgeKeeper from capiscio_mcp.events import GuardEventEmitter, set_event_emitter -from capiscio_mcp.guard import set_pip_config -from capiscio_mcp.pip import PIPConfig from capiscio_mcp.registration import ( RegistrationError, generate_server_keypair, @@ -280,7 +278,6 @@ async def connect( auto_badge: bool = True, renewal_threshold: int = 30, on_badge_renew: Optional[Callable[[str], None]] = None, - pdp_endpoint: Optional[str] = None, ) -> "MCPServerIdentity": """Connect to CapiscIO and get a fully-configured MCP server identity. @@ -306,10 +303,6 @@ async def connect( auto_badge: If ``True``, issue an initial badge and start auto-renewal. renewal_threshold: Renew badge this many seconds before expiry. on_badge_renew: Optional callback ``(badge: str) -> None`` on renewal. - pdp_endpoint: Optional remote PDP URL for org-policy enforcement. - Defaults to empty (local OPA bundle evaluation via Go core). - Use ``CAPISCIO_PDP_ENDPOINT`` env var or this param only when - a remote PDP service is explicitly deployed. Returns: :class:`MCPServerIdentity` with ``.did``, ``.badge``, ``.keys_dir``, @@ -538,13 +531,15 @@ async def connect( if auto_badge: badge = await _issue_badge(server_id, effective_api_key, server_url, domain=domain) if badge: + effective_domain = domain or _derive_domain(server_url) keeper = ServerBadgeKeeper( server_id=server_id, - api_key=api_key, + api_key=effective_api_key, initial_badge=badge, ca_url=server_url, renewal_threshold=renewal_threshold, on_renew=on_badge_renew, + domain=effective_domain, ) keeper.start() else: @@ -563,27 +558,6 @@ async def connect( ) ) - # Step 7: Auto-configure PDP for org-policy enforcement - # Policy evaluation is LOCAL: the Go core fetches the OPA bundle via - # CAPISCIO_BUNDLE_URL and evaluates it with its embedded OPA engine. - # pdp_endpoint is only needed if a remote PDP is explicitly configured. - effective_pdp = ( - pdp_endpoint - or os.environ.get("CAPISCIO_PDP_ENDPOINT") - or "" - ) - set_pip_config( - PIPConfig( - pdp_endpoint=effective_pdp, - pep_id=f"mcp-server:{server_id}", - workspace=server_id, - ) - ) - if effective_pdp: - logger.info("Remote PDP configured: pdp_endpoint=%s", effective_pdp) - else: - logger.debug("Using local OPA bundle for policy evaluation") - return cls( server_id=server_id, did=did, # type: ignore[arg-type] @@ -605,7 +579,6 @@ async def from_env(cls, **kwargs: Any) -> "MCPServerIdentity": - ``CAPISCIO_API_KEY`` (required) - ``CAPISCIO_SERVER_URL`` (optional, default: production) - ``CAPISCIO_SERVER_DOMAIN`` (optional, default: hostname from SERVER_URL) - - ``CAPISCIO_PDP_ENDPOINT`` (optional — PDP URL for org-policy enforcement) - ``CAPISCIO_SERVER_PRIVATE_KEY_PEM`` (optional — PEM-encoded Ed25519 private key for ephemeral environments; printed on first generation) @@ -674,3 +647,41 @@ async def from_env(cls, **kwargs: Any) -> "MCPServerIdentity": domain=domain or None, **kwargs, ) + + # ------------------------------------------------------------------ + # Sync convenience methods + # ------------------------------------------------------------------ + + @classmethod + def connect_sync( + cls, + server_id: str, + api_key: Optional[str] = None, + **kwargs: Any, + ) -> "MCPServerIdentity": + """Synchronous version of :meth:`connect`. + + Accepts the same arguments. Runs ``connect()`` in a fresh event loop + so callers don't need ``asyncio.run()`` boilerplate. + + Example:: + + identity = MCPServerIdentity.connect_sync( + server_id=os.environ["CAPISCIO_SERVER_ID"], + api_key=os.environ["CAPISCIO_API_KEY"], + ) + """ + return asyncio.run(cls.connect(server_id, api_key, **kwargs)) + + @classmethod + def from_env_sync(cls, **kwargs: Any) -> "MCPServerIdentity": + """Synchronous version of :meth:`from_env`. + + Reads the same environment variables. Runs ``from_env()`` in a fresh + event loop so callers don't need ``asyncio.run()`` boilerplate. + + Example:: + + identity = MCPServerIdentity.from_env_sync() + """ + return asyncio.run(cls.from_env(**kwargs)) diff --git a/capiscio_mcp/guard.py b/capiscio_mcp/guard.py index d1a7730..215cca6 100644 --- a/capiscio_mcp/guard.py +++ b/capiscio_mcp/guard.py @@ -34,6 +34,8 @@ async def execute_query(sql: str) -> list[dict]: import hashlib import json import logging +import threading +import time from dataclasses import dataclass, field from datetime import datetime, timezone from functools import wraps @@ -41,9 +43,11 @@ async def execute_query(sql: str) -> list[dict]: Any, Callable, Coroutine, + Dict, List, Optional, ParamSpec, + Tuple, TypeVar, Union, overload, @@ -58,7 +62,6 @@ async def execute_query(sql: str) -> list[dict]: ) from capiscio_mcp.errors import GuardError, GuardConfigError from capiscio_mcp.events import get_event_emitter -from capiscio_mcp.pip import PIPConfig, PolicyClient logger = logging.getLogger(__name__) @@ -85,30 +88,51 @@ async def execute_query(sql: str) -> list[dict]: contextvars.ContextVar("caller_badge", default=None) ) -# Module-level PIP config singleton (set by MCPServerIdentity.connect() or manually) -_pip_config: Optional[PIPConfig] = None - - -def set_pip_config(config: Optional[PIPConfig]) -> None: - """Set the module-level PIP configuration for org policy evaluation. - - When set, ``@guard`` performs a second-phase policy check via - ``EvaluatePolicyDecision`` after the inline trust-level check. The - stricter decision wins. - - Typically auto-configured by ``MCPServerIdentity.connect()``. - - Args: - config: PIP configuration with ``pdp_endpoint`` set, or ``None`` - to disable org-policy checks. - """ - global _pip_config - _pip_config = config - - -def get_pip_config() -> Optional[PIPConfig]: - """Return the current module-level PIP configuration, or ``None``.""" - return _pip_config +# ── Decision cache ────────────────────────────────────────────────── +# Keyed on (badge_jws | "", tool_name). Same badge string = same JTI, +# same signature, same claims — the decision cannot change until the +# badge refreshes (producing a new JWS, which is a new cache key). +# +# TTL is conservative (5 s) so org-policy changes propagate quickly. +# In practice this eliminates the gRPC round-trip for repeated tool +# calls within a burst: first call ~3 ms, subsequent calls ~0.01 ms. +_DECISION_CACHE_TTL = 5.0 # seconds +_DECISION_CACHE_MAX_SIZE = 256 # max entries before eviction +_decision_cache: Dict[Tuple[str, str], Tuple["GuardResult", float]] = {} +_decision_cache_lock = threading.Lock() + + +def _cache_get(badge_jws: str, tool_name: str) -> Optional["GuardResult"]: + """Return cached decision if still valid, else None.""" + with _decision_cache_lock: + entry = _decision_cache.get((badge_jws, tool_name)) + if entry is None: + return None + result, expiry = entry + if time.monotonic() > expiry: + del _decision_cache[(badge_jws, tool_name)] + return None + return result + + +def _cache_put(badge_jws: str, tool_name: str, result: "GuardResult") -> None: + """Store a decision in the cache.""" + with _decision_cache_lock: + # Evict expired entries if cache is at capacity + if len(_decision_cache) >= _DECISION_CACHE_MAX_SIZE: + now = time.monotonic() + expired = [k for k, (_, exp) in _decision_cache.items() if now > exp] + for k in expired: + del _decision_cache[k] + # If still at capacity after expiry sweep, evict oldest entries + if len(_decision_cache) >= _DECISION_CACHE_MAX_SIZE: + oldest = sorted(_decision_cache, key=lambda k: _decision_cache[k][1]) + for k in oldest[:len(_decision_cache) - _DECISION_CACHE_MAX_SIZE + 1]: + del _decision_cache[k] + _decision_cache[(badge_jws, tool_name)] = ( + result, + time.monotonic() + _DECISION_CACHE_TTL, + ) @dataclass @@ -130,8 +154,7 @@ class GuardConfig: allowed_tools: Optional[List[str]] = None policy_version: Optional[str] = None require_badge: bool = False - pip_config: Optional[PIPConfig] = None - + def __post_init__(self) -> None: """Validate configuration on creation.""" self.validate() @@ -301,6 +324,13 @@ async def evaluate_tool_access( effective_config = config or GuardConfig() effective_credential = credential or get_credential() or CallerCredential() + # Check decision cache — same badge + same tool = same decision + cache_key_jws = effective_credential.badge_jws or "" + cached = _cache_get(cache_key_jws, tool_name) + if cached is not None: + logger.debug("Decision cache hit: tool=%s decision=%s", tool_name, cached.decision.value) + return cached + # Compute params hash locally (PII never leaves Python) params_hash = compute_params_hash(params) server_origin = get_server_origin() @@ -339,25 +369,11 @@ async def evaluate_tool_access( if deny_on_unknown_class is not None: request.deny_on_unknown_class = deny_on_unknown_class - # Make RPC call + # Make RPC call — the Go core handles both badge verification AND + # local OPA policy evaluation in a single round-trip. Policy bundles + # are downloaded at init and polled every 30 s; no per-request HTTP. response = await client.stub.EvaluateToolAccess(request) - - # Phase 2: Org policy check via PDP (if configured) - # The inline check (trust level, allowed tools) runs first. - # If that passes, the PDP gets a say — the stricter decision wins. - pip_cfg = (effective_config.pip_config or _pip_config) - if ( - pip_cfg is not None - and pip_cfg.pdp_endpoint - and response.decision == mcp_pb2.ALLOW - ): - response = await _evaluate_org_policy( - pip_cfg=pip_cfg, - inline_response=response, - tool_name=tool_name, - capability_class=capability_class, - ) - + # Map response to GuardResult decision = Decision.ALLOW if response.decision == mcp_pb2.ALLOW else Decision.DENY @@ -383,7 +399,7 @@ async def evaluate_tool_access( } auth_level = auth_level_map.get(response.auth_level, AuthLevel.ANONYMOUS) - return GuardResult( + result = GuardResult( decision=decision, deny_reason=deny_reason, deny_detail=response.deny_detail or None, @@ -398,73 +414,10 @@ async def evaluate_tool_access( presented_capability=response.presented_capability or None, ) + # Cache for subsequent calls with the same badge + tool + _cache_put(cache_key_jws, tool_name, result) -async def _evaluate_org_policy( - *, - pip_cfg: PIPConfig, - inline_response: Any, - tool_name: str, - capability_class: Optional[str] = None, -) -> Any: - """Second-phase org-policy check via EvaluatePolicyDecision RPC. - - Called only when the inline evaluation (phase 1) returned ALLOW and a PDP - endpoint is configured. If the PDP says DENY, we override the inline - response so that the stricter decision wins. - - If the PDP is unreachable we respect the enforcement mode configured in - ``pip_cfg`` (default EM-OBSERVE = allow through with a warning). - - Returns: - The original ``inline_response`` (possibly mutated to DENY). - """ - from capiscio_mcp._proto.capiscio.v1 import mcp_pb2 - - try: - policy_client = PolicyClient(pip_cfg) - policy_result = await policy_client.evaluate( - subject_did=inline_response.agent_did or "", - badge_jti=inline_response.badge_jti or "", - trust_level=str(inline_response.trust_level), - operation=tool_name, - capability_class=capability_class or "", - ) - - if policy_result.denied: - logger.info( - "Org policy DENY override: tool=%s agent=%s reason=%s decision_id=%s", - tool_name, - inline_response.agent_did, - policy_result.reason, - policy_result.decision_id, - ) - # Mutate the inline response so downstream mapping picks it up - inline_response.decision = mcp_pb2.DENY - inline_response.deny_reason = mcp_pb2.TOOL_POLICY_DENIED - inline_response.deny_detail = ( - policy_result.reason or "Denied by organization policy" - ) - elif policy_result.pdp_error: - logger.warning( - "PDP unavailable during org-policy check: error_code=%s tool=%s", - policy_result.error_code, - tool_name, - ) - # The Go core already applied the enforcement-mode fallback. - # EM-OBSERVE → ALLOW_OBSERVE (pass through). - # EM-GUARD/EM-STRICT → DENY (fail-closed). - if policy_result.decision == "DENY": - inline_response.decision = mcp_pb2.DENY - inline_response.deny_reason = mcp_pb2.TOOL_POLICY_DENIED - inline_response.deny_detail = "Policy service unavailable" - except Exception: - logger.warning( - "Org policy evaluation failed for tool=%s — allowing through (best-effort)", - tool_name, - exc_info=True, - ) - - return inline_response + return result def _emit_deny_event( @@ -629,7 +582,7 @@ async def wrapper(*args: P.args, **kwargs: P.kwargs) -> R: # Check decision if result.decision == Decision.DENY: - logger.warning( + logger.info( "capiscio.policy_enforced: tool=%s decision=DENY " "agent=%s trust_level=%s reason=%s error_code=%s " "requested_capability=%s presented_capability=%s " @@ -742,7 +695,7 @@ async def run_eval(): # Check decision if result.decision == Decision.DENY: - logger.warning( + logger.info( "capiscio.policy_enforced: tool=%s decision=DENY " "agent=%s trust_level=%s reason=%s error_code=%s " "requested_capability=%s presented_capability=%s " diff --git a/capiscio_mcp/integrations/mcp.py b/capiscio_mcp/integrations/mcp.py index 66669d7..c1bc5b1 100644 --- a/capiscio_mcp/integrations/mcp.py +++ b/capiscio_mcp/integrations/mcp.py @@ -11,14 +11,8 @@ Usage (Server):: from capiscio_mcp.integrations.mcp import CapiscioMCPServer - from capiscio_mcp import MCPServerIdentity - identity = await MCPServerIdentity.connect( - server_id=os.environ["CAPISCIO_SERVER_ID"], - api_key=os.environ["CAPISCIO_API_KEY"], - ) - - server = CapiscioMCPServer(identity=identity) + server = CapiscioMCPServer.connect() @server.tool(min_trust_level=2) async def read_file(path: str) -> str: @@ -297,15 +291,9 @@ class CapiscioMCPServer: Example:: - from capiscio_mcp import MCPServerIdentity from capiscio_mcp.integrations.mcp import CapiscioMCPServer - identity = await MCPServerIdentity.connect( - server_id=os.environ["CAPISCIO_SERVER_ID"], - api_key=os.environ["CAPISCIO_API_KEY"], - ) - - server = CapiscioMCPServer(identity=identity) + server = CapiscioMCPServer.connect() @server.tool(min_trust_level=2) async def read_file(path: str) -> str: @@ -314,14 +302,15 @@ async def read_file(path: str) -> str: server.run() - You can also supply credentials directly (without ``MCPServerIdentity``):: + You can also supply an identity or credentials directly:: - server = CapiscioMCPServer( - name="filesystem", - did="did:web:mcp.example.com:servers:filesystem", - badge=os.environ.get("SERVER_BADGE"), - private_key_path="/path/to/server.key.pem", + from capiscio_mcp import MCPServerIdentity + + identity = await MCPServerIdentity.connect( + server_id=os.environ["CAPISCIO_SERVER_ID"], + api_key=os.environ["CAPISCIO_API_KEY"], ) + server = CapiscioMCPServer(identity=identity) """ def __init__( @@ -398,6 +387,36 @@ def __init__( # for stdio transport (where HTTP headers are not available). _install_credential_extraction(self._server) + # ------------------------------------------------------------------ + # Factory classmethods + # ------------------------------------------------------------------ + + @classmethod + def connect(cls, **kwargs: Any) -> "CapiscioMCPServer": + """Connect to the CapiscIO registry and return a ready-to-use server. + + Combines ``MCPServerIdentity.from_env_sync()`` and server construction + into a single call — the MCP server equivalent of ``CapiscIO.connect()`` + from the agent SDK. + + Any extra keyword arguments are forwarded to the constructor + (e.g. ``default_min_trust_level``, ``name``). + + Example:: + + server = CapiscioMCPServer.connect() + + @server.tool(min_trust_level=1) + async def place_order(sku: str, quantity: int) -> str: + ... + + server.run() + """ + from capiscio_mcp.connect import MCPServerIdentity + + identity = MCPServerIdentity.from_env_sync() + return cls(identity=identity, **kwargs) + # ------------------------------------------------------------------ # Private helpers # ------------------------------------------------------------------ @@ -561,6 +580,12 @@ def run(self, transport: str = "stdio") -> None: self._server.run(transport=transport) finally: _capiscio_meta_ctx.reset(token) + # Flush pending telemetry events before the process exits + from capiscio_mcp.events import get_event_emitter + + emitter = get_event_emitter() + if emitter is not None: + emitter.flush(timeout=5.0) def run_stdio(self) -> None: """Run over stdio transport (deprecated — use :meth:`run` instead).""" @@ -939,6 +964,21 @@ def is_verified(self) -> bool: and self._verify_result.state == ServerState.VERIFIED_PRINCIPAL ) + # ------------------------------------------------------------------ + # Credential management + # ------------------------------------------------------------------ + + def set_badge(self, badge: Optional[str]) -> None: + """Update the client badge for subsequent tool calls. + + Useful for demos or test harnesses that reuse a single server + connection but need to impersonate different agents. + + Args: + badge: New badge JWS string, or ``None`` to clear. + """ + self._credential.badge_jws = badge + # ------------------------------------------------------------------ # Tool calls # ------------------------------------------------------------------ diff --git a/capiscio_mcp/keeper.py b/capiscio_mcp/keeper.py index 2604d0b..8200bf0 100644 --- a/capiscio_mcp/keeper.py +++ b/capiscio_mcp/keeper.py @@ -88,6 +88,7 @@ def __init__( renewal_threshold: int = DEFAULT_RENEWAL_THRESHOLD, check_interval: int = DEFAULT_CHECK_INTERVAL, on_renew: Optional[Callable[[str], None]] = None, + domain: Optional[str] = None, ) -> None: """Initialize ServerBadgeKeeper. @@ -99,6 +100,8 @@ def __init__( renewal_threshold: Renew when ``exp - now <= renewal_threshold`` seconds. check_interval: How often (seconds) the background thread wakes to check. on_renew: Optional callback called with the new badge string after each renewal. + domain: Domain for badge issuance (e.g. ``"tools.example.com"``). + Required by the registry if the server has no default domain. """ self.server_id = server_id self.api_key = api_key @@ -106,6 +109,7 @@ def __init__( self.renewal_threshold = renewal_threshold self.check_interval = check_interval self.on_renew = on_renew + self.domain = domain self._current_badge: Optional[str] = initial_badge self._badge_lock = threading.Lock() @@ -221,8 +225,11 @@ def _renew(self) -> None: "X-Capiscio-Registry-Key": self.api_key, "Content-Type": "application/json", } + body: dict = {} + if self.domain: + body["domain"] = self.domain try: - resp = requests.post(url, headers=headers, json={}, timeout=30) + resp = requests.post(url, headers=headers, json=body, timeout=30) if resp.status_code in (200, 201): try: data = resp.json() @@ -230,8 +237,10 @@ def _renew(self) -> None: logger.warning("Badge renewal response was not valid JSON: %s", exc) return # Try multiple common response shapes + inner = data.get("data") or {} new_badge = ( - (data.get("data") or {}).get("badge") + inner.get("badge") + or inner.get("token") or data.get("badge") or data.get("token") ) diff --git a/docs/guides/deployment.md b/docs/guides/deployment.md index 0e0b51a..4577506 100644 --- a/docs/guides/deployment.md +++ b/docs/guides/deployment.md @@ -60,7 +60,7 @@ The SDK resolves the server identity in this order: ## Environment Variables Reference -All variables used by `MCPServerIdentity.connect()` and `MCPServerIdentity.from_env()`: +All variables used by `CapiscioMCPServer.connect()`: | Variable | Required | Description | |----------|----------|-------------| @@ -103,19 +103,12 @@ CMD ["python", "server.py"] ### Server Code ```python -from capiscio_mcp import MCPServerIdentity from capiscio_mcp.integrations.mcp import CapiscioMCPServer -async def main(): +def main(): # Reads CAPISCIO_SERVER_ID, CAPISCIO_API_KEY, and # CAPISCIO_SERVER_PRIVATE_KEY_PEM from environment - identity = await MCPServerIdentity.from_env() - - server = CapiscioMCPServer( - name="my-server", - did=identity.did, - badge=identity.badge, - ) + server = CapiscioMCPServer.connect() @server.tool(min_trust_level=2) async def my_tool(param: str) -> str: @@ -130,17 +123,17 @@ async def main(): ```python import json -from capiscio_mcp import MCPServerIdentity +from capiscio_mcp.integrations.mcp import CapiscioMCPServer -async def handler(event, context): +def handler(event, context): # Key injected via Lambda environment variables or Secrets Manager - identity = await MCPServerIdentity.from_env() + server = CapiscioMCPServer.connect() return { "statusCode": 200, "body": json.dumps({ - "server_did": identity.did, - "badge_valid": identity.badge is not None, + "server_did": server.did, + "badge_valid": server.badge is not None, }), } ``` diff --git a/tests/conftest.py b/tests/conftest.py index fdf0ef1..045784b 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -8,6 +8,15 @@ # Note: With pytest-asyncio in auto mode, no event_loop fixture needed +@pytest.fixture(autouse=True) +def _clear_decision_cache(): + """Clear the guard decision cache before each test to prevent cross-test pollution.""" + from capiscio_mcp.guard import _decision_cache + _decision_cache.clear() + yield + _decision_cache.clear() + + @pytest.fixture def mock_core_client(): """Mock CoreClient for unit tests.""" diff --git a/tests/test_guard.py b/tests/test_guard.py index abd1461..e622463 100644 --- a/tests/test_guard.py +++ b/tests/test_guard.py @@ -15,6 +15,10 @@ GuardResult, _caller_did, _caller_badge, + _cache_get, + _cache_put, + _decision_cache, + _DECISION_CACHE_MAX_SIZE, ) from capiscio_mcp.types import Decision, AuthLevel, DenyReason from capiscio_mcp.errors import GuardError, GuardConfigError @@ -521,3 +525,107 @@ async def test_evaluate_propagates_scope_fields(self, mock_core_client): assert result.error_code == "SCOPE_INSUFFICIENT" assert result.requested_capability == "network" assert result.presented_capability == "storage" + + +class TestDecisionCache: + """Tests for the guard decision cache.""" + + def test_cache_put_and_get(self): + """Stored decision is retrievable.""" + result = GuardResult( + decision=Decision.ALLOW, + agent_did="did:web:example.com:agents:test", + badge_jti="jti-1", + auth_level=AuthLevel.BADGE, + trust_level=2, + ) + _cache_put("badge-jws-1", "tool_a", result) + cached = _cache_get("badge-jws-1", "tool_a") + assert cached is not None + assert cached.decision == Decision.ALLOW + + def test_cache_miss_returns_none(self): + """Missing key returns None.""" + assert _cache_get("nonexistent-badge", "nonexistent-tool") is None + + def test_cache_expired_entry_returns_none(self): + """Expired entries are evicted and return None.""" + import time + result = GuardResult( + decision=Decision.ALLOW, + agent_did="did:web:example.com:agents:test", + badge_jti="jti-2", + auth_level=AuthLevel.BADGE, + trust_level=1, + ) + # Manually insert an already-expired entry + _decision_cache[("expired-badge", "tool_b")] = (result, time.monotonic() - 1) + assert _cache_get("expired-badge", "tool_b") is None + assert ("expired-badge", "tool_b") not in _decision_cache + + def test_cache_different_keys_are_independent(self): + """Different badge/tool combinations don't collide.""" + allow = GuardResult( + decision=Decision.ALLOW, + agent_did="did:web:a", + badge_jti="j1", + auth_level=AuthLevel.BADGE, + trust_level=2, + ) + deny = GuardResult( + decision=Decision.DENY, + agent_did="did:web:b", + badge_jti="j2", + auth_level=AuthLevel.ANONYMOUS, + trust_level=0, + deny_reason=DenyReason.BADGE_MISSING, + ) + _cache_put("badge-a", "tool_x", allow) + _cache_put("badge-b", "tool_x", deny) + assert _cache_get("badge-a", "tool_x").decision == Decision.ALLOW + assert _cache_get("badge-b", "tool_x").decision == Decision.DENY + + def test_cache_evicts_at_capacity(self): + """When cache hits max size, oldest entries are evicted.""" + result = GuardResult( + decision=Decision.ALLOW, + agent_did="did:web:example.com:agents:test", + badge_jti="jti-cap", + auth_level=AuthLevel.BADGE, + trust_level=1, + ) + # Fill cache to capacity + for i in range(_DECISION_CACHE_MAX_SIZE): + _cache_put(f"badge-fill-{i}", "tool_cap", result) + assert len(_decision_cache) == _DECISION_CACHE_MAX_SIZE + # Insert one more — should evict oldest + _cache_put("badge-fill-new", "tool_cap", result) + assert len(_decision_cache) == _DECISION_CACHE_MAX_SIZE + # Newest entry is present + assert _cache_get("badge-fill-new", "tool_cap") is not None + # Oldest (badge-fill-0) should have been evicted + assert _cache_get("badge-fill-0", "tool_cap") is None + + def test_cache_evicts_expired_before_oldest(self): + """Expired entries are swept before evicting by age.""" + import time + result = GuardResult( + decision=Decision.ALLOW, + agent_did="did:web:example.com:agents:test", + badge_jti="jti-exp", + auth_level=AuthLevel.BADGE, + trust_level=1, + ) + # Insert one already-expired entry + _decision_cache[("badge-expired-sweep", "tool_exp")] = ( + result, + time.monotonic() - 1, + ) + # Fill remaining slots + for i in range(_DECISION_CACHE_MAX_SIZE - 1): + _cache_put(f"badge-sweep-{i}", "tool_exp", result) + assert len(_decision_cache) == _DECISION_CACHE_MAX_SIZE + # Insert one more — expired entry should be swept, no valid entry lost + _cache_put("badge-sweep-new", "tool_exp", result) + assert _cache_get("badge-sweep-new", "tool_exp") is not None + assert ("badge-expired-sweep", "tool_exp") not in _decision_cache