From 86ece56b3016c9f4473b171688b253f7f3b9c097 Mon Sep 17 00:00:00 2001 From: Beon de Nood Date: Sat, 17 Jan 2026 10:19:19 -0500 Subject: [PATCH 1/5] fix: align TrustLevel enum with RFC-002 v1.4 MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit BREAKING CHANGE: Trust level naming corrected to match RFC-002 v1.4. Before (incorrect): - Level 1: DV (Domain Validated) - Level 2: OV (Organization Validated) - Level 3: EV (Extended Validation) - Level 4: CV (Continuous Validation) ← doesn't exist in RFC After (correct per RFC-002 v1.4): - Level 1: REG (Registered) - Level 2: DV (Domain Validated) - Level 3: OV (Organization Validated) - Level 4: EV (Extended Validated) Integer values (0-4) unchanged - this is a naming/documentation fix only. The actual verification logic was already correct. Files updated: - capiscio_mcp/types.py: TrustLevel enum docstring and comments - README.md: Trust level table - docs/index.md: Trust level table - docs/getting-started/quickstart.md: Trust level table - docs/guides/server-side.md: Code comment (CV → EV) Refs: RFC-002 v1.4 §5 Trust Levels --- README.md | 18 ++++++++++-------- capiscio_mcp/types.py | 28 +++++++++++++++------------- docs/getting-started/quickstart.md | 14 ++++++++------ docs/guides/server-side.md | 4 ++-- docs/index.md | 18 ++++++++++-------- 5 files changed, 45 insertions(+), 37 deletions(-) diff --git a/README.md b/README.md index 46adb9c..05fd263 100644 --- a/README.md +++ b/README.md @@ -36,7 +36,7 @@ MCP Guard solves this with: | **@guard decorator** | Protect tools with trust-level requirements | | **Evidence logging** | Cryptographic audit trail for every invocation | | **Server identity** | Verify MCP servers before connecting | -| **Trust levels** | 0 (self-signed) → 4 (continuous validation) | +| **Trust levels** | 0 (self-signed) → 4 (extended validation) | ## Quickstart 1: Server-Side (Tool Guarding) @@ -164,13 +164,15 @@ export CAPISCIO_CORE_ADDR="localhost:50051" ## Trust Levels -| Level | Name | Description | -|-------|------|-------------| -| 0 | Self-Signed | `did:key` issuer, cryptographic identity only | -| 1 | Domain Validated (DV) | Domain ownership verified | -| 2 | Organization Validated (OV) | Organization identity verified | -| 3 | Extended Validation (EV) | Legal entity verification | -| 4 | Continuous Validation (CV) | Runtime attestation | +Per RFC-002 v1.4: + +| Level | Name | Validation | Use Case | +|-------|------|------------|----------| +| 0 | Self-Signed (SS) | None, `did:key` issuer | Local dev, testing, demos | +| 1 | Registered (REG) | Account registration | Development, internal agents | +| 2 | Domain Validated (DV) | DNS/HTTP challenge | Production, B2B agents | +| 3 | Organization Validated (OV) | DUNS/legal entity | High-trust production | +| 4 | Extended Validated (EV) | Manual review + legal | Regulated industries | ## Evidence Logging diff --git a/capiscio_mcp/types.py b/capiscio_mcp/types.py index eed182a..c786978 100644 --- a/capiscio_mcp/types.py +++ b/capiscio_mcp/types.py @@ -71,19 +71,21 @@ class DenyReason(str, Enum): class TrustLevel(IntEnum): """ - Trust levels per RFC-002. - - - LEVEL_0: Self-signed (did:key issuer) - - LEVEL_1: Domain Validated (DV) - - LEVEL_2: Organization Validated (OV) - - LEVEL_3: Extended Validation (EV) - - LEVEL_4: Continuous Validation (CV) - """ - LEVEL_0 = 0 # Self-signed - LEVEL_1 = 1 # DV - LEVEL_2 = 2 # OV - LEVEL_3 = 3 # EV - LEVEL_4 = 4 # CV + Trust levels per RFC-002 v1.4. + + - LEVEL_0: Self-Signed (SS) - did:key issuer, no external validation + - LEVEL_1: Registered (REG) - Account registration with CapiscIO Registry + - LEVEL_2: Domain Validated (DV) - DNS/HTTP challenge proving domain control + - LEVEL_3: Organization Validated (OV) - DUNS/legal entity verification + - LEVEL_4: Extended Validated (EV) - Manual review + legal agreement + + See: https://docs.capisc.io/rfcs/002-trust-badge/#5-trust-levels + """ + LEVEL_0 = 0 # Self-Signed (SS) + LEVEL_1 = 1 # Registered (REG) + LEVEL_2 = 2 # Domain Validated (DV) + LEVEL_3 = 3 # Organization Validated (OV) + LEVEL_4 = 4 # Extended Validated (EV) # ============================================================================= diff --git a/docs/getting-started/quickstart.md b/docs/getting-started/quickstart.md index 8ceca92..efc5a1e 100644 --- a/docs/getting-started/quickstart.md +++ b/docs/getting-started/quickstart.md @@ -76,13 +76,15 @@ elif result.state == ServerState.UNVERIFIED_ORIGIN: ## Trust Levels -| Level | Name | Who Issues | Use Case | +Per RFC-002 v1.4: + +| Level | Name | Validation | Use Case | |-------|------|------------|----------| -| 0 | Self-Signed | Agent itself (`did:key`) | Development, testing | -| 1 | Domain Validated (DV) | CapiscIO Registry | Production agents | -| 2 | Organization Validated (OV) | CapiscIO Registry | Business agents | -| 3 | Extended Validation (EV) | CapiscIO Registry | Financial, healthcare | -| 4 | Continuous Validation (CV) | CapiscIO Registry | Critical infrastructure | +| 0 | Self-Signed (SS) | None, `did:key` issuer | Local dev, testing, demos | +| 1 | Registered (REG) | Account registration | Development, internal agents | +| 2 | Domain Validated (DV) | DNS/HTTP challenge | Production, B2B agents | +| 3 | Organization Validated (OV) | DUNS/legal entity | High-trust production | +| 4 | Extended Validated (EV) | Manual review + legal | Regulated industries | ## Next Steps diff --git a/docs/guides/server-side.md b/docs/guides/server-side.md index eeccbbe..4b95422 100644 --- a/docs/guides/server-side.md +++ b/docs/guides/server-side.md @@ -97,12 +97,12 @@ async def read_file(path: str) -> str: @guard(min_trust_level=3) async def write_file(path: str, content: str) -> None: - """High-risk: Write files (EV required).""" + """High-risk: Write files (OV required).""" pass @guard(min_trust_level=4) async def execute_command(cmd: str) -> str: - """Critical: Execute shell commands (CV required).""" + """Critical: Execute shell commands (EV required).""" pass ``` diff --git a/docs/index.md b/docs/index.md index 22132c4..8bc1ccf 100644 --- a/docs/index.md +++ b/docs/index.md @@ -22,7 +22,7 @@ MCP Guard solves this with: | **@guard decorator** | Protect tools with trust-level requirements | | **Evidence logging** | Cryptographic audit trail for every invocation | | **Server identity** | Verify MCP servers before connecting | -| **Trust levels** | 0 (self-signed) → 4 (continuous validation) | +| **Trust levels** | 0 (self-signed) → 4 (extended validation) | ## Quick Example @@ -54,13 +54,15 @@ if result.state == ServerState.VERIFIED_PRINCIPAL: ## Trust Levels -| Level | Name | Description | -|-------|------|-------------| -| 0 | Self-Signed | `did:key` issuer, cryptographic identity only | -| 1 | Domain Validated (DV) | Domain ownership verified | -| 2 | Organization Validated (OV) | Organization identity verified | -| 3 | Extended Validation (EV) | Legal entity verification | -| 4 | Continuous Validation (CV) | Runtime attestation | +Per RFC-002 v1.4: + +| Level | Name | Validation | Use Case | +|-------|------|------------|----------| +| 0 | Self-Signed (SS) | None, `did:key` issuer | Local dev, testing, demos | +| 1 | Registered (REG) | Account registration | Development, internal agents | +| 2 | Domain Validated (DV) | DNS/HTTP challenge | Production, B2B agents | +| 3 | Organization Validated (OV) | DUNS/legal entity | High-trust production | +| 4 | Extended Validated (EV) | Manual review + legal | Regulated industries | ## Next Steps From d15b59449a6ad05da9ede967bd11f83717808b85 Mon Sep 17 00:00:00 2001 From: Beon de Nood Date: Sat, 17 Jan 2026 18:19:29 -0500 Subject: [PATCH 2/5] docs: add server registration documentation - Add registration module to API reference - Create server-registration.md guide - Update index.md with registration feature - Update README.md with Quickstart 3 and Registration API - Update mkdocs.yml with registration guide in nav - Add registration exports to __init__.py docstring - Add server.py early-return for DECLARED_PRINCIPAL state --- README.md | 55 ++- capiscio_mcp/__init__.py | 30 ++ capiscio_mcp/registration.py | 418 +++++++++++++++++++ capiscio_mcp/server.py | 12 + docs/api-reference.md | 23 ++ docs/guides/server-registration.md | 230 +++++++++++ docs/index.md | 10 + mkdocs.yml | 1 + tests/test_registration.py | 642 +++++++++++++++++++++++++++++ 9 files changed, 1418 insertions(+), 3 deletions(-) create mode 100644 capiscio_mcp/registration.py create mode 100644 docs/guides/server-registration.md create mode 100644 tests/test_registration.py diff --git a/README.md b/README.md index 05fd263..4428e24 100644 --- a/README.md +++ b/README.md @@ -36,6 +36,7 @@ MCP Guard solves this with: | **@guard decorator** | Protect tools with trust-level requirements | | **Evidence logging** | Cryptographic audit trail for every invocation | | **Server identity** | Verify MCP servers before connecting | +| **Server registration** | Generate keypairs and register server DIDs | | **Trust levels** | 0 (self-signed) → 4 (extended validation) | ## Quickstart 1: Server-Side (Tool Guarding) @@ -137,6 +138,42 @@ async with CapiscioMCPClient( result = await client.call_tool("read_file", {"path": "/data/file.txt"}) ``` +## Quickstart 3: Server Registration + +Register your MCP server's identity with the CapiscIO registry: + +```python +from capiscio_mcp import setup_server_identity + +# One-step setup: generate keys + register with registry +result = await setup_server_identity( + server_id="550e8400-e29b-41d4-a716-446655440000", # From dashboard + api_key="sk_live_...", # Registry API key + output_dir="./keys", +) + +print(f"Server DID: {result['did']}") +# did:key:z6MkhaXgBZDvotDkL5257faiztiGiC2QtKLGpbnnEGta2doK +print(f"Private key saved to: {result['private_key_path']}") +``` + +### Step-by-Step Registration + +```python +from capiscio_mcp import generate_server_keypair, register_server_identity + +# Step 1: Generate keypair +keys = await generate_server_keypair(output_dir="./keys") + +# Step 2: Register with registry +await register_server_identity( + server_id="550e8400-e29b-41d4-a716-446655440000", + api_key="sk_live_...", + did=keys["did_key"], + public_key=keys["public_key_pem"], +) +``` + ## Core Connection Modes MCP Guard connects to capiscio-core for cryptographic operations: @@ -266,6 +303,17 @@ config = VerifyConfig( - `VerifyResult` — Verification result dataclass - `ServerVerifyError` — Exception for verification failures +### Registration (Server Identity) + +- `generate_server_keypair(key_id, output_dir)` — Generate Ed25519 keypair +- `generate_server_keypair_sync(...)` — Sync version +- `register_server_identity(server_id, api_key, did, public_key, ca_url)` — Register DID with registry +- `register_server_identity_sync(...)` — Sync version +- `setup_server_identity(server_id, api_key, ca_url, output_dir, key_id)` — Combined setup +- `setup_server_identity_sync(...)` — Sync version +- `RegistrationError` — Exception for registration failures +- `KeyGenerationError` — Exception for key generation failures + ### Types - `Decision` — ALLOW / DENY @@ -279,9 +327,10 @@ config = VerifyConfig( - [RFC-006: MCP Tool Authority and Evidence](https://docs.capisc.io/rfcs/006) - [RFC-007: MCP Server Identity Disclosure](https://docs.capisc.io/rfcs/007) -- [Server-Side Guide](https://docs.capisc.io/mcp/server-side) -- [Client-Side Guide](https://docs.capisc.io/mcp/client-side) -- [Evidence Logging Guide](https://docs.capisc.io/mcp/evidence) +- [Server Registration Guide](https://docs.capisc.io/mcp-guard/guides/server-registration) +- [Server-Side Guide](https://docs.capisc.io/mcp-guard/guides/server-side) +- [Client-Side Guide](https://docs.capisc.io/mcp-guard/guides/client-side) +- [Evidence Logging Guide](https://docs.capisc.io/mcp-guard/guides/evidence) ## Development diff --git a/capiscio_mcp/__init__.py b/capiscio_mcp/__init__.py index 99be378..08e641c 100644 --- a/capiscio_mcp/__init__.py +++ b/capiscio_mcp/__init__.py @@ -7,6 +7,7 @@ This package provides: - @guard decorator for protecting MCP tools with trust-level requirements - Server identity verification for MCP clients +- Server identity registration for MCP servers - PoP (Proof of Possession) handshake for server key verification - Evidence logging for audit and forensics @@ -31,6 +32,16 @@ async def read_database(query: str) -> list[dict]: ) if result.state == ServerState.VERIFIED_PRINCIPAL: print(f"Trusted at level {result.trust_level}") + +Quickstart (Server Registration): + from capiscio_mcp import setup_server_identity + + result = await setup_server_identity( + server_id="your-server-uuid", + api_key="sk_live_...", + output_dir="./keys", + ) + print(f"Server DID: {result['did']}") """ from capiscio_mcp.types import ( @@ -71,6 +82,16 @@ async def read_database(query: str) -> list[dict]: PoPSignatureError, PoPExpiredError, ) +from capiscio_mcp.registration import ( + generate_server_keypair, + generate_server_keypair_sync, + register_server_identity, + register_server_identity_sync, + setup_server_identity, + setup_server_identity_sync, + RegistrationError, + KeyGenerationError, +) from capiscio_mcp._core.version import ( MCP_VERSION, CORE_MIN_VERSION, @@ -118,4 +139,13 @@ async def read_database(query: str) -> list[dict]: "PoPError", "PoPSignatureError", "PoPExpiredError", + # Registration (MCP Server Identity) + "generate_server_keypair", + "generate_server_keypair_sync", + "register_server_identity", + "register_server_identity_sync", + "setup_server_identity", + "setup_server_identity_sync", + "RegistrationError", + "KeyGenerationError", ] diff --git a/capiscio_mcp/registration.py b/capiscio_mcp/registration.py new file mode 100644 index 0000000..5a0f844 --- /dev/null +++ b/capiscio_mcp/registration.py @@ -0,0 +1,418 @@ +""" +MCP Server Identity Registration. + +This module provides functions for MCP servers to: +1. Generate Ed25519 keypairs (deriving did:key) +2. Register their DID with the CapiscIO registry + +This follows the same pattern as agent identity registration in capiscio-sdk-python. + +Usage: + from capiscio_mcp.registration import setup_server_identity + + # One-step setup: generate keys + register with registry + result = await setup_server_identity( + server_id="your-server-uuid", + api_key="sk_live_...", + ca_url="https://registry.capisc.io", + output_dir="./keys", + ) + print(f"Server DID: {result['did']}") + print(f"Private key saved to: {result['private_key_path']}") + + # Or step-by-step: + from capiscio_mcp.registration import generate_server_keypair, register_server_identity + + # Step 1: Generate keypair + keys = await generate_server_keypair(output_dir="./keys") + + # Step 2: Register with registry + await register_server_identity( + server_id="your-server-uuid", + api_key="sk_live_...", + did=keys["did_key"], + public_key=keys["public_key_pem"], + ) +""" + +from __future__ import annotations + +import asyncio +import base64 +import logging +import os +from pathlib import Path +from typing import Optional + +import requests + +from capiscio_mcp.errors import CoreConnectionError + +logger = logging.getLogger(__name__) + + +# ============================================================================= +# Errors +# ============================================================================= + + +class RegistrationError(Exception): + """Error during server identity registration.""" + + def __init__(self, message: str, status_code: Optional[int] = None) -> None: + super().__init__(message) + self.status_code = status_code + + +class KeyGenerationError(Exception): + """Error generating keypair.""" + + pass + + +# ============================================================================= +# Key Generation (via capiscio-core gRPC) +# ============================================================================= + + +async def generate_server_keypair( + key_id: str = "", + output_dir: Optional[str] = None, +) -> dict: + """ + Generate Ed25519 keypair for MCP server identity. + + Uses capiscio-core's SimpleGuardService.GenerateKeyPair via gRPC. + The keypair is used for PoP (Proof of Possession) verification. + + Args: + key_id: Optional specific key ID. If empty, one is generated. + output_dir: Optional directory to save private key PEM file. + If provided, saves as {key_id}.pem + + Returns: + dict with: + - key_id: The key identifier + - did_key: The derived did:key URI (e.g., did:key:z6Mk...) + - public_key_pem: PEM-encoded public key + - private_key_pem: PEM-encoded private key + - private_key_path: Path to saved key file (if output_dir provided) + + Raises: + KeyGenerationError: If key generation fails + CoreConnectionError: If connection to capiscio-core fails + + Example: + keys = await generate_server_keypair(output_dir="./keys") + print(f"DID: {keys['did_key']}") + # did:key:z6MkhaXgBZD... + """ + from capiscio_mcp._core.client import CoreClient + from capiscio_mcp._proto.gen.capiscio.v1 import simpleguard_pb2, simpleguard_pb2_grpc + from capiscio_mcp._proto.gen.capiscio.v1 import trust_pb2 + + try: + # Get core client (auto-downloads binary if needed) + client = await CoreClient.get_instance() + + # Create SimpleGuard stub + simpleguard_stub = simpleguard_pb2_grpc.SimpleGuardServiceStub(client._channel) + + # Build request + request = simpleguard_pb2.GenerateKeyPairRequest( + algorithm=trust_pb2.KEY_ALGORITHM_ED25519, + key_id=key_id, + metadata={}, + ) + + # Make RPC call + response = await simpleguard_stub.GenerateKeyPair(request) + + if response.error_message: + raise KeyGenerationError(f"Key generation failed: {response.error_message}") + + result = { + "key_id": response.key_id, + "did_key": response.did_key, + "public_key_pem": response.public_key_pem, + "private_key_pem": response.private_key_pem, + } + + # Save private key to file if output_dir provided + if output_dir: + output_path = Path(output_dir) + output_path.mkdir(parents=True, exist_ok=True) + + key_filename = f"{response.key_id}.pem" + key_path = output_path / key_filename + + with open(key_path, "w") as f: + f.write(response.private_key_pem) + + # Set restrictive permissions (owner read/write only) + os.chmod(key_path, 0o600) + + result["private_key_path"] = str(key_path) + logger.info(f"Private key saved to {key_path}") + + logger.info(f"Generated keypair with DID: {response.did_key}") + return result + + except CoreConnectionError: + raise + except Exception as e: + raise KeyGenerationError(f"Failed to generate keypair: {e}") from e + + +def generate_server_keypair_sync( + key_id: str = "", + output_dir: Optional[str] = None, +) -> dict: + """ + Sync wrapper for generate_server_keypair. + + See generate_server_keypair() for full documentation. + """ + try: + loop = asyncio.get_running_loop() + except RuntimeError: + loop = None + + if loop is not None: + import concurrent.futures + + future = asyncio.run_coroutine_threadsafe( + generate_server_keypair(key_id, output_dir), + loop, + ) + return future.result(timeout=60.0) + else: + return asyncio.run(generate_server_keypair(key_id, output_dir)) + + +# ============================================================================= +# Registry Registration (via HTTP) +# ============================================================================= + + +async def register_server_identity( + server_id: str, + api_key: str, + did: str, + public_key: str, + ca_url: str = "https://registry.capisc.io", +) -> dict: + """ + Register MCP server DID with the CapiscIO registry. + + Uses PUT /v1/sdk/servers/{id} to update the server's DID and public key. + This follows the same pattern as agent identity registration. + + Args: + server_id: The MCP server's UUID (from dashboard creation) + api_key: Registry API key (X-Capiscio-Registry-Key) + did: The server's DID (e.g., did:key:z6Mk...) + public_key: PEM-encoded public key + ca_url: Registry URL (default: https://registry.capisc.io) + + Returns: + dict with: + - success: True if registration succeeded + - message: Status message + - data: Updated server object (if successful) + + Raises: + RegistrationError: If registration fails + + Example: + result = await register_server_identity( + server_id="550e8400-e29b-41d4-a716-446655440000", + api_key="sk_live_abc123...", + did="did:key:z6MkhaXgBZD...", + public_key="-----BEGIN PUBLIC KEY-----...", + ) + """ + # Run in thread pool to avoid blocking async loop + loop = asyncio.get_event_loop() + return await loop.run_in_executor( + None, + _register_server_identity_sync, + server_id, + api_key, + did, + public_key, + ca_url, + ) + + +def _register_server_identity_sync( + server_id: str, + api_key: str, + did: str, + public_key: str, + ca_url: str, +) -> dict: + """Sync implementation of register_server_identity.""" + url = f"{ca_url.rstrip('/')}/v1/sdk/servers/{server_id}" + + headers = { + "Content-Type": "application/json", + "X-Capiscio-Registry-Key": api_key, + } + + payload = { + "did": did, + "publicKey": public_key, + } + + try: + response = requests.put(url, json=payload, headers=headers, timeout=30) + + if response.status_code == 200: + data = response.json() + logger.info(f"Successfully registered server identity: {did}") + return { + "success": True, + "message": "Server identity registered successfully", + "data": data.get("data"), + } + elif response.status_code == 400: + error_data = response.json() + error_msg = error_data.get("message", "Invalid request") + raise RegistrationError(f"Bad request: {error_msg}", status_code=400) + elif response.status_code == 401: + raise RegistrationError("Invalid API key", status_code=401) + elif response.status_code == 404: + raise RegistrationError(f"Server not found: {server_id}", status_code=404) + else: + raise RegistrationError( + f"Registration failed with status {response.status_code}", + status_code=response.status_code, + ) + + except requests.RequestException as e: + raise RegistrationError(f"Network error: {e}") from e + + +def register_server_identity_sync( + server_id: str, + api_key: str, + did: str, + public_key: str, + ca_url: str = "https://registry.capisc.io", +) -> dict: + """ + Sync wrapper for register_server_identity. + + See register_server_identity() for full documentation. + """ + return _register_server_identity_sync(server_id, api_key, did, public_key, ca_url) + + +# ============================================================================= +# Convenience: Combined Setup +# ============================================================================= + + +async def setup_server_identity( + server_id: str, + api_key: str, + ca_url: str = "https://registry.capisc.io", + output_dir: Optional[str] = None, + key_id: str = "", +) -> dict: + """ + Generate keypair and register server identity in one call. + + This is the recommended way to set up MCP server identity: + 1. Generates Ed25519 keypair via capiscio-core + 2. Registers the DID with the CapiscIO registry + 3. Optionally saves the private key to disk + + Args: + server_id: The MCP server's UUID (from dashboard creation) + api_key: Registry API key (X-Capiscio-Registry-Key) + ca_url: Registry URL (default: https://registry.capisc.io) + output_dir: Optional directory to save private key PEM file + key_id: Optional specific key ID. If empty, one is generated. + + Returns: + dict with: + - did: The server's DID (did:key:z6Mk...) + - public_key_pem: PEM-encoded public key + - private_key_pem: PEM-encoded private key + - private_key_path: Path to saved key (if output_dir provided) + - key_id: The key identifier + + Raises: + KeyGenerationError: If key generation fails + RegistrationError: If registry registration fails + CoreConnectionError: If connection to capiscio-core fails + + Example: + result = await setup_server_identity( + server_id="550e8400-e29b-41d4-a716-446655440000", + api_key="sk_live_abc123...", + output_dir="./keys", + ) + + # Use result['private_key_pem'] for PoP signing + # Save result['did'] for server identity disclosure + """ + # Step 1: Generate keypair + logger.info(f"Generating keypair for server {server_id}...") + keys = await generate_server_keypair(key_id=key_id, output_dir=output_dir) + + # Step 2: Register with registry + logger.info(f"Registering DID {keys['did_key']} with registry...") + await register_server_identity( + server_id=server_id, + api_key=api_key, + did=keys["did_key"], + public_key=keys["public_key_pem"], + ca_url=ca_url, + ) + + result = { + "did": keys["did_key"], + "public_key_pem": keys["public_key_pem"], + "private_key_pem": keys["private_key_pem"], + "key_id": keys["key_id"], + } + + if "private_key_path" in keys: + result["private_key_path"] = keys["private_key_path"] + + logger.info(f"Server identity setup complete: {keys['did_key']}") + return result + + +def setup_server_identity_sync( + server_id: str, + api_key: str, + ca_url: str = "https://registry.capisc.io", + output_dir: Optional[str] = None, + key_id: str = "", +) -> dict: + """ + Sync wrapper for setup_server_identity. + + See setup_server_identity() for full documentation. + """ + try: + loop = asyncio.get_running_loop() + except RuntimeError: + loop = None + + if loop is not None: + import concurrent.futures + + future = asyncio.run_coroutine_threadsafe( + setup_server_identity(server_id, api_key, ca_url, output_dir, key_id), + loop, + ) + return future.result(timeout=120.0) + else: + return asyncio.run( + setup_server_identity(server_id, api_key, ca_url, output_dir, key_id) + ) diff --git a/capiscio_mcp/server.py b/capiscio_mcp/server.py index c5ed17f..608503f 100644 --- a/capiscio_mcp/server.py +++ b/capiscio_mcp/server.py @@ -142,6 +142,18 @@ async def verify_server( error_code=ServerErrorCode.NONE, ) + # Quick path: DID disclosed but no badge → DECLARED_PRINCIPAL + # Per RFC-007 §7.2: No badge means identity is claimed but not verified + # This path doesn't require gRPC connection to capiscio-core + if not server_badge: + logger.debug(f"Server disclosed DID ({server_did}) but no badge (DECLARED_PRINCIPAL)") + return VerifyResult( + state=ServerState.DECLARED_PRINCIPAL, + server_did=server_did, + error_code=ServerErrorCode.NONE, + ) + + # Full verification path: DID + badge requires gRPC validation # Get core client client = await CoreClient.get_instance() diff --git a/docs/api-reference.md b/docs/api-reference.md index 79671b8..849a4c8 100644 --- a/docs/api-reference.md +++ b/docs/api-reference.md @@ -21,6 +21,14 @@ This section provides detailed API documentation for all public modules in capis - DenyReason - ServerState - ServerErrorCode + - generate_server_keypair + - generate_server_keypair_sync + - register_server_identity + - register_server_identity_sync + - setup_server_identity + - setup_server_identity_sync + - RegistrationError + - KeyGenerationError show_root_heading: false ## Guard Module (RFC-006) @@ -91,6 +99,21 @@ This section provides detailed API documentation for all public modules in capis - CoreVersionError show_root_heading: false +## Registration Module (Server Identity) + +::: capiscio_mcp.registration + options: + members: + - generate_server_keypair + - generate_server_keypair_sync + - register_server_identity + - register_server_identity_sync + - setup_server_identity + - setup_server_identity_sync + - RegistrationError + - KeyGenerationError + show_root_heading: false + ## MCP SDK Integration ::: capiscio_mcp.integrations.mcp diff --git a/docs/guides/server-registration.md b/docs/guides/server-registration.md new file mode 100644 index 0000000..117ba76 --- /dev/null +++ b/docs/guides/server-registration.md @@ -0,0 +1,230 @@ +````markdown +# Server Identity Registration + +This guide covers setting up a verifiable identity for your MCP server. + +## Why Server Identity? + +MCP servers expose powerful tools—file systems, databases, APIs. But how do clients know they're connecting to the **real** server and not an imposter? + +Server identity registration solves this by: + +- **Generating a keypair** for cryptographic signing +- **Creating a DID** (Decentralized Identifier) for the server +- **Registering with the CapiscIO Registry** for discoverability + +## Quick Start + +```python +from capiscio_mcp import setup_server_identity + +# One-step setup: generate keys + register with registry +result = await setup_server_identity( + server_id="550e8400-e29b-41d4-a716-446655440000", # From dashboard + api_key="sk_live_...", # Registry API key + output_dir="./keys", +) + +print(f"Server DID: {result['did']}") +# did:key:z6MkhaXgBZDvotDkL5257faiztiGiC2QtKLGpbnnEGta2doK +``` + +## Prerequisites + +1. **Create your MCP server in the CapiscIO Dashboard** + - Go to [dashboard.capisc.io](https://dashboard.capisc.io) + - Navigate to Servers → Create Server + - Copy the server UUID + +2. **Get an API key** + - In the dashboard, go to Settings → API Keys + - Create a key with `servers:write` permission + +3. **capiscio-core must be running** + - Either embedded (auto-downloaded) or external + +## Step-by-Step Registration + +### Step 1: Generate Keypair + +Generate an Ed25519 keypair for your server: + +```python +from capiscio_mcp import generate_server_keypair + +keys = await generate_server_keypair(output_dir="./keys") + +print(f"DID: {keys['did_key']}") +print(f"Private key: {keys['private_key_path']}") +``` + +Returns: + +| Key | Description | +|-----|-------------| +| `key_id` | Unique key identifier | +| `did_key` | The derived `did:key:z6Mk...` URI | +| `public_key_pem` | PEM-encoded public key | +| `private_key_pem` | PEM-encoded private key | +| `private_key_path` | Path to saved key file (if `output_dir` provided) | + +### Step 2: Register with Registry + +Register the DID with the CapiscIO registry: + +```python +from capiscio_mcp import register_server_identity + +await register_server_identity( + server_id="550e8400-e29b-41d4-a716-446655440000", + api_key="sk_live_...", + did=keys["did_key"], + public_key=keys["public_key_pem"], +) +``` + +### Combined: setup_server_identity + +For convenience, use the combined function: + +```python +from capiscio_mcp import setup_server_identity + +result = await setup_server_identity( + server_id="550e8400-e29b-41d4-a716-446655440000", + api_key="sk_live_...", + output_dir="./keys", +) + +# Returns everything you need +print(f"DID: {result['did']}") +print(f"Private key: {result['private_key_path']}") +``` + +## Synchronous API + +All functions have sync wrappers: + +```python +from capiscio_mcp import ( + generate_server_keypair_sync, + register_server_identity_sync, + setup_server_identity_sync, +) + +# Sync version +result = setup_server_identity_sync( + server_id="550e8400-e29b-41d4-a716-446655440000", + api_key="sk_live_...", + output_dir="./keys", +) +``` + +## Using the Identity + +After registration, use the DID and private key for server identity disclosure: + +### With CapiscioMCPServer + +```python +from capiscio_mcp.integrations.mcp import CapiscioMCPServer + +server = CapiscioMCPServer( + name="filesystem", + did=result["did"], + private_key_path=result["private_key_path"], +) + +@server.tool(min_trust_level=2) +async def read_file(path: str) -> str: + """Server identity is automatically disclosed.""" + with open(path) as f: + return f.read() +``` + +### Manual Disclosure + +Add identity headers to responses: + +```python +from fastapi import FastAPI, Response + +app = FastAPI() + +@app.middleware("http") +async def add_server_identity(request, call_next): + response = await call_next(request) + response.headers["Capiscio-Server-DID"] = SERVER_DID + response.headers["Capiscio-Server-Badge"] = SERVER_BADGE + return response +``` + +## Error Handling + +```python +from capiscio_mcp import ( + setup_server_identity, + RegistrationError, + KeyGenerationError, +) +from capiscio_mcp.errors import CoreConnectionError + +try: + result = await setup_server_identity( + server_id="550e8400-e29b-41d4-a716-446655440000", + api_key="sk_live_...", + ) +except CoreConnectionError as e: + print("Could not connect to capiscio-core") + print("Ensure it's running: capiscio mcp serve") +except KeyGenerationError as e: + print(f"Key generation failed: {e}") +except RegistrationError as e: + print(f"Registration failed: {e}") + if e.status_code == 401: + print("Invalid API key") + elif e.status_code == 404: + print("Server not found - create it in the dashboard first") +``` + +## Environment Variables + +| Variable | Description | +|----------|-------------| +| `CAPISCIO_CORE_ADDR` | External core address (default: embedded) | +| `CAPISCIO_SERVER_DID` | Pre-configured server DID | +| `CAPISCIO_SERVER_PRIVATE_KEY` | Path to private key PEM | + +## Security Best Practices + +1. **Never commit private keys** + ```gitignore + # .gitignore + *.pem + keys/ + capiscio_keys/ + ``` + +2. **Use restrictive permissions** + ```bash + chmod 600 ./keys/*.pem + ``` + +3. **Rotate keys periodically** + - Generate new keypair + - Update registry with new DID + - Keep old key for transition period + +4. **Store API keys securely** + ```bash + # Use environment variables + export CAPISCIO_REGISTRY_API_KEY="sk_live_..." + ``` + +## Next Steps + +- [Protect MCP Tools](server-side.md) - Add trust-level requirements +- [Client-Side Verification](client-side.md) - Verify servers before connecting +- [Evidence Logging](evidence.md) - Audit trail for all tool calls + +```` \ No newline at end of file diff --git a/docs/index.md b/docs/index.md index 8bc1ccf..1685d95 100644 --- a/docs/index.md +++ b/docs/index.md @@ -14,6 +14,7 @@ MCP servers expose powerful tools to autonomous agents—file systems, databases - **Authenticate** which agent is calling a tool - **Authorize** whether that agent should have access - **Audit** what happened for post-incident review +- **Identify** which server the client is connecting to MCP Guard solves this with: @@ -22,6 +23,7 @@ MCP Guard solves this with: | **@guard decorator** | Protect tools with trust-level requirements | | **Evidence logging** | Cryptographic audit trail for every invocation | | **Server identity** | Verify MCP servers before connecting | +| **Server registration** | Generate keypairs and register server DIDs | | **Trust levels** | 0 (self-signed) → 4 (extended validation) | ## Quick Example @@ -84,6 +86,14 @@ Per RFC-002 v1.4: [:octicons-arrow-right-24: Quickstart](getting-started/quickstart.md) +- :material-key:{ .lg .middle } **Server Registration** + + --- + + Generate a keypair and register your server's DID. + + [:octicons-arrow-right-24: Server Registration](guides/server-registration.md) + - :material-shield-check:{ .lg .middle } **Server-Side Guide** --- diff --git a/mkdocs.yml b/mkdocs.yml index 8d66cd7..6c9f8f2 100644 --- a/mkdocs.yml +++ b/mkdocs.yml @@ -65,6 +65,7 @@ nav: - Installation: getting-started/installation.md - Quickstart: getting-started/quickstart.md - Guides: + - Server Registration: guides/server-registration.md - Server-Side (Guarding Tools): guides/server-side.md - Client-Side (Verifying Servers): guides/client-side.md - Evidence Logging: guides/evidence.md diff --git a/tests/test_registration.py b/tests/test_registration.py new file mode 100644 index 0000000..0e2a391 --- /dev/null +++ b/tests/test_registration.py @@ -0,0 +1,642 @@ +""" +Tests for capiscio_mcp.registration module. + +Tests MCP server identity registration following the agent DID pattern. +""" + +import pytest +from unittest.mock import AsyncMock, MagicMock, patch +import json + + +class TestKeyGenerationErrors: + """Tests for key generation error handling.""" + + @pytest.mark.asyncio + async def test_generate_keypair_grpc_error(self): + """Raise KeyGenerationError on gRPC failure.""" + from capiscio_mcp.registration import ( + generate_server_keypair, + KeyGenerationError, + ) + + with patch("capiscio_mcp._core.client.CoreClient") as mock_client_class: + mock_instance = AsyncMock() + mock_instance._channel = MagicMock() + mock_client_class.get_instance = AsyncMock(return_value=mock_instance) + + # Simulate gRPC error via async stub + mock_stub = AsyncMock() + mock_stub.GenerateKeyPair.side_effect = Exception("gRPC error") + + with patch.dict("sys.modules", { + "capiscio_mcp._proto.gen.capiscio.v1.simpleguard_pb2": MagicMock(), + "capiscio_mcp._proto.gen.capiscio.v1.simpleguard_pb2_grpc": MagicMock( + SimpleGuardServiceStub=MagicMock(return_value=mock_stub) + ), + "capiscio_mcp._proto.gen.capiscio.v1.trust_pb2": MagicMock(), + }): + with pytest.raises(KeyGenerationError): + await generate_server_keypair() + + +class TestKeyGenerationSuccess: + """Tests for successful key generation.""" + + @pytest.mark.asyncio + async def test_generate_keypair_success(self): + """Successfully generate a keypair.""" + from capiscio_mcp.registration import generate_server_keypair + + # Create mock response + mock_response = MagicMock() + mock_response.key_id = "test-key-id-123" + mock_response.public_key_pem = "-----BEGIN PUBLIC KEY-----\nMCowBQ..." + mock_response.private_key_pem = "-----BEGIN PRIVATE KEY-----\nMC4CA..." + mock_response.did_key = "did:key:z6MkTestKeyHere" + mock_response.error_message = "" + + mock_stub = AsyncMock() + mock_stub.GenerateKeyPair.return_value = mock_response + + with patch("capiscio_mcp._core.client.CoreClient") as mock_client_class: + mock_instance = AsyncMock() + mock_instance._channel = MagicMock() + mock_client_class.get_instance = AsyncMock(return_value=mock_instance) + + with patch.dict("sys.modules", { + "capiscio_mcp._proto.gen.capiscio.v1.simpleguard_pb2": MagicMock(), + "capiscio_mcp._proto.gen.capiscio.v1.simpleguard_pb2_grpc": MagicMock( + SimpleGuardServiceStub=MagicMock(return_value=mock_stub) + ), + "capiscio_mcp._proto.gen.capiscio.v1.trust_pb2": MagicMock(), + }): + result = await generate_server_keypair() + + assert result["key_id"] == "test-key-id-123" + assert result["public_key_pem"] == mock_response.public_key_pem + assert result["private_key_pem"] == mock_response.private_key_pem + assert result["did_key"] == "did:key:z6MkTestKeyHere" + + +class TestRegistrationErrors: + """Tests for registration error handling.""" + + @pytest.mark.asyncio + async def test_register_missing_params(self): + """Test registration with empty params.""" + from capiscio_mcp.registration import ( + register_server_identity, + RegistrationError, + ) + + # Test with a mock that returns 400 + with patch("capiscio_mcp.registration.requests.put") as mock_put: + mock_response = MagicMock() + mock_response.status_code = 400 + mock_response.json.return_value = {"message": "Bad request"} + mock_put.return_value = mock_response + + with pytest.raises(RegistrationError, match="Bad request"): + await register_server_identity( + server_id="test-server", + did="did:key:z6MkTest", + public_key="-----BEGIN PUBLIC KEY-----\ntest", + api_key="test-key", + ca_url="https://api.capisc.io", + ) + + @pytest.mark.asyncio + async def test_register_unauthorized(self): + """Raise RegistrationError on 401.""" + from capiscio_mcp.registration import ( + register_server_identity, + RegistrationError, + ) + + with patch("capiscio_mcp.registration.requests.put") as mock_put: + mock_response = MagicMock() + mock_response.status_code = 401 + mock_put.return_value = mock_response + + with pytest.raises(RegistrationError, match="Invalid API key"): + await register_server_identity( + server_id="test-server", + did="did:key:z6MkTest", + public_key="-----BEGIN PUBLIC KEY-----\ntest", + api_key="invalid-key", + ca_url="https://api.capisc.io", + ) + + +class TestRegistrationSuccess: + """Tests for successful registration.""" + + @pytest.mark.asyncio + async def test_register_server_identity_success(self): + """Successfully register server identity.""" + from capiscio_mcp.registration import register_server_identity + + with patch("capiscio_mcp.registration.requests.put") as mock_put: + mock_response = MagicMock() + mock_response.status_code = 200 + mock_response.json.return_value = { + "data": { + "id": "test-server", + "did": "did:key:z6MkTest", + "publicKey": "-----BEGIN PUBLIC KEY-----\ntest", + } + } + mock_put.return_value = mock_response + + result = await register_server_identity( + server_id="test-server", + did="did:key:z6MkTest", + public_key="-----BEGIN PUBLIC KEY-----\ntest", + api_key="valid-key", + ca_url="https://api.capisc.io", + ) + + assert result["success"] is True + assert result["data"]["did"] == "did:key:z6MkTest" + + # Verify the PUT was called correctly + mock_put.assert_called_once() + call_args = mock_put.call_args + assert "/v1/sdk/servers/test-server" in call_args[0][0] + assert call_args[1]["headers"]["X-Capiscio-Registry-Key"] == "valid-key" + + def test_register_server_identity_sync(self): + """Test synchronous registration wrapper.""" + from capiscio_mcp.registration import register_server_identity_sync + + with patch("capiscio_mcp.registration.requests.put") as mock_put: + mock_response = MagicMock() + mock_response.status_code = 200 + mock_response.json.return_value = {"data": {"id": "sync-server"}} + mock_put.return_value = mock_response + + result = register_server_identity_sync( + server_id="sync-server", + did="did:key:z6MkSync", + public_key="-----BEGIN PUBLIC KEY-----\ntest", + api_key="test-key", + ca_url="https://api.capisc.io", + ) + + assert result["success"] is True + + +class TestSetupServerIdentity: + """Tests for the combined setup_server_identity function.""" + + @pytest.mark.asyncio + async def test_setup_server_identity_success(self): + """Successfully setup server identity end-to-end.""" + from capiscio_mcp.registration import setup_server_identity + + # Mock key generation response + mock_key_response = MagicMock() + mock_key_response.key_id = "generated-key-id" + mock_key_response.public_key_pem = "-----BEGIN PUBLIC KEY-----\ngenerated" + mock_key_response.private_key_pem = "-----BEGIN PRIVATE KEY-----\ngenerated" + mock_key_response.did_key = "did:key:z6MkGenerated" + mock_key_response.error_message = "" + + mock_stub = AsyncMock() + mock_stub.GenerateKeyPair.return_value = mock_key_response + + with patch("capiscio_mcp._core.client.CoreClient") as mock_client_class: + mock_instance = AsyncMock() + mock_instance._channel = MagicMock() + mock_client_class.get_instance = AsyncMock(return_value=mock_instance) + + with patch.dict("sys.modules", { + "capiscio_mcp._proto.gen.capiscio.v1.simpleguard_pb2": MagicMock(), + "capiscio_mcp._proto.gen.capiscio.v1.simpleguard_pb2_grpc": MagicMock( + SimpleGuardServiceStub=MagicMock(return_value=mock_stub) + ), + "capiscio_mcp._proto.gen.capiscio.v1.trust_pb2": MagicMock(), + }): + with patch("capiscio_mcp.registration.requests.put") as mock_put: + mock_reg_response = MagicMock() + mock_reg_response.status_code = 200 + mock_reg_response.json.return_value = { + "data": { + "id": "my-server", + "did": "did:key:z6MkGenerated", + } + } + mock_put.return_value = mock_reg_response + + result = await setup_server_identity( + server_id="my-server", + api_key="my-api-key", + ca_url="https://api.capisc.io", + ) + + # Check result contains keypair and registration info + assert result["did"] == "did:key:z6MkGenerated" + assert result["private_key_pem"] == "-----BEGIN PRIVATE KEY-----\ngenerated" + assert result["key_id"] == "generated-key-id" + + +class TestRegistrationIdempotency: + """Tests for idempotent registration behavior.""" + + @pytest.mark.asyncio + async def test_register_twice_same_identity(self): + """Registering the same identity twice should succeed (PUT is idempotent).""" + from capiscio_mcp.registration import register_server_identity + + with patch("capiscio_mcp.registration.requests.put") as mock_put: + # Both calls return success + mock_response = MagicMock() + mock_response.status_code = 200 + mock_response.json.return_value = { + "data": { + "id": "idempotent-server", + "did": "did:key:z6MkSame", + } + } + mock_put.return_value = mock_response + + # First registration + result1 = await register_server_identity( + server_id="idempotent-server", + did="did:key:z6MkSame", + public_key="-----BEGIN PUBLIC KEY-----\ntest", + api_key="key", + ca_url="https://api.capisc.io", + ) + + # Second registration with same data + result2 = await register_server_identity( + server_id="idempotent-server", + did="did:key:z6MkSame", + public_key="-----BEGIN PUBLIC KEY-----\ntest", + api_key="key", + ca_url="https://api.capisc.io", + ) + + assert result1["success"] == result2["success"] + assert mock_put.call_count == 2 + + +class TestExceptionTypes: + """Tests for exception type handling.""" + + def test_registration_error_inheritance(self): + """RegistrationError should be a proper Exception.""" + from capiscio_mcp.registration import RegistrationError + + error = RegistrationError("test message") + assert isinstance(error, Exception) + assert str(error) == "test message" + + def test_key_generation_error_inheritance(self): + """KeyGenerationError should be a proper Exception.""" + from capiscio_mcp.registration import KeyGenerationError + + error = KeyGenerationError("key gen failed") + assert isinstance(error, Exception) + assert str(error) == "key gen failed" + + def test_registration_error_has_status_code(self): + """RegistrationError should accept status_code.""" + from capiscio_mcp.registration import RegistrationError + + error = RegistrationError("test", status_code=401) + assert error.status_code == 401 + + +class TestRequestPayload: + """Tests for the correct request payload format.""" + + @pytest.mark.asyncio + async def test_request_payload_structure(self): + """Verify the PUT request payload has correct structure.""" + from capiscio_mcp.registration import register_server_identity + + with patch("capiscio_mcp.registration.requests.put") as mock_put: + mock_response = MagicMock() + mock_response.status_code = 200 + mock_response.json.return_value = {"data": {"id": "test"}} + mock_put.return_value = mock_response + + await register_server_identity( + server_id="payload-test", + did="did:key:z6MkPayload", + public_key="-----BEGIN PUBLIC KEY-----\npayload", + api_key="test-key", + ca_url="https://api.capisc.io", + ) + + # Check the JSON payload + call_args = mock_put.call_args + json_payload = call_args[1]["json"] + + assert json_payload["did"] == "did:key:z6MkPayload" + assert json_payload["publicKey"] == "-----BEGIN PUBLIC KEY-----\npayload" + + @pytest.mark.asyncio + async def test_request_headers(self): + """Verify the request headers are correct.""" + from capiscio_mcp.registration import register_server_identity + + with patch("capiscio_mcp.registration.requests.put") as mock_put: + mock_response = MagicMock() + mock_response.status_code = 200 + mock_response.json.return_value = {"data": {"id": "test"}} + mock_put.return_value = mock_response + + await register_server_identity( + server_id="header-test", + did="did:key:z6MkHeader", + public_key="-----BEGIN PUBLIC KEY-----\nheader", + api_key="my-special-key", + ca_url="https://api.capisc.io", + ) + + call_args = mock_put.call_args + headers = call_args[1]["headers"] + + assert headers["X-Capiscio-Registry-Key"] == "my-special-key" + assert headers["Content-Type"] == "application/json" + + @pytest.mark.asyncio + async def test_request_url_construction(self): + """Verify URL is constructed correctly.""" + from capiscio_mcp.registration import register_server_identity + + with patch("capiscio_mcp.registration.requests.put") as mock_put: + mock_response = MagicMock() + mock_response.status_code = 200 + mock_response.json.return_value = {"data": {"id": "test"}} + mock_put.return_value = mock_response + + await register_server_identity( + server_id="url-test-server", + did="did:key:z6MkUrl", + public_key="-----BEGIN PUBLIC KEY-----\nurl", + api_key="key", + ca_url="https://api.capisc.io", + ) + + call_args = mock_put.call_args + url = call_args[0][0] + + assert url == "https://api.capisc.io/v1/sdk/servers/url-test-server" + + @pytest.mark.asyncio + async def test_ca_url_trailing_slash(self): + """Handle ca_url with trailing slash.""" + from capiscio_mcp.registration import register_server_identity + + with patch("capiscio_mcp.registration.requests.put") as mock_put: + mock_response = MagicMock() + mock_response.status_code = 200 + mock_response.json.return_value = {"data": {"id": "test"}} + mock_put.return_value = mock_response + + await register_server_identity( + server_id="slash-test", + did="did:key:z6MkSlash", + public_key="-----BEGIN PUBLIC KEY-----\nslash", + api_key="key", + ca_url="https://api.capisc.io/", # Note trailing slash + ) + + call_args = mock_put.call_args + url = call_args[0][0] + + # Should not have double slash + assert "//" not in url.replace("https://", "") + + +class TestNetworkErrors: + """Tests for network error handling.""" + + @pytest.mark.asyncio + async def test_register_network_timeout(self): + """Raise RegistrationError on network timeout.""" + from capiscio_mcp.registration import ( + register_server_identity, + RegistrationError, + ) + + with patch("capiscio_mcp.registration.requests.put") as mock_put: + import requests as req_lib + mock_put.side_effect = req_lib.exceptions.Timeout("Connection timed out") + + with pytest.raises(RegistrationError, match="Network error"): + await register_server_identity( + server_id="timeout-test", + did="did:key:z6MkTimeout", + public_key="-----BEGIN PUBLIC KEY-----\ntest", + api_key="key", + ca_url="https://api.capisc.io", + ) + + @pytest.mark.asyncio + async def test_register_connection_error(self): + """Raise RegistrationError on connection failure.""" + from capiscio_mcp.registration import ( + register_server_identity, + RegistrationError, + ) + + with patch("capiscio_mcp.registration.requests.put") as mock_put: + import requests as req_lib + mock_put.side_effect = req_lib.exceptions.ConnectionError("Connection refused") + + with pytest.raises(RegistrationError, match="Network error"): + await register_server_identity( + server_id="conn-error-test", + did="did:key:z6MkConnError", + public_key="-----BEGIN PUBLIC KEY-----\ntest", + api_key="key", + ca_url="https://api.capisc.io", + ) + + +class TestServerNotFound: + """Tests for server not found handling.""" + + @pytest.mark.asyncio + async def test_register_server_not_found(self): + """Raise RegistrationError when server ID doesn't exist.""" + from capiscio_mcp.registration import ( + register_server_identity, + RegistrationError, + ) + + with patch("capiscio_mcp.registration.requests.put") as mock_put: + mock_response = MagicMock() + mock_response.status_code = 404 + mock_put.return_value = mock_response + + with pytest.raises(RegistrationError, match="not found"): + await register_server_identity( + server_id="nonexistent-server", + did="did:key:z6MkNotFound", + public_key="-----BEGIN PUBLIC KEY-----\ntest", + api_key="key", + ca_url="https://api.capisc.io", + ) + + +class TestKeyGenerationWithErrorMessage: + """Tests for key generation when server returns error message.""" + + @pytest.mark.asyncio + async def test_generate_keypair_server_error_message(self): + """Raise KeyGenerationError when server returns error_message.""" + from capiscio_mcp.registration import ( + generate_server_keypair, + KeyGenerationError, + ) + + # Create mock response with error_message + mock_response = MagicMock() + mock_response.key_id = "" + mock_response.public_key_pem = "" + mock_response.private_key_pem = "" + mock_response.did_key = "" + mock_response.error_message = "Key generation failed: invalid algorithm" + + mock_stub = AsyncMock() + mock_stub.GenerateKeyPair.return_value = mock_response + + with patch("capiscio_mcp._core.client.CoreClient") as mock_client_class: + mock_instance = AsyncMock() + mock_instance._channel = MagicMock() + mock_client_class.get_instance = AsyncMock(return_value=mock_instance) + + with patch.dict("sys.modules", { + "capiscio_mcp._proto.gen.capiscio.v1.simpleguard_pb2": MagicMock(), + "capiscio_mcp._proto.gen.capiscio.v1.simpleguard_pb2_grpc": MagicMock( + SimpleGuardServiceStub=MagicMock(return_value=mock_stub) + ), + "capiscio_mcp._proto.gen.capiscio.v1.trust_pb2": MagicMock(), + }): + with pytest.raises(KeyGenerationError, match="Key generation failed"): + await generate_server_keypair() + + +class TestOutputDirSaving: + """Tests for saving private key to output directory.""" + + @pytest.mark.asyncio + async def test_generate_keypair_saves_to_output_dir(self, tmp_path): + """Successfully save private key to output directory.""" + from capiscio_mcp.registration import generate_server_keypair + + # Create mock response + mock_response = MagicMock() + mock_response.key_id = "saved-key-id" + mock_response.public_key_pem = "-----BEGIN PUBLIC KEY-----\ntest" + mock_response.private_key_pem = "-----BEGIN PRIVATE KEY-----\nsecret" + mock_response.did_key = "did:key:z6MkSaved" + mock_response.error_message = "" + + mock_stub = AsyncMock() + mock_stub.GenerateKeyPair.return_value = mock_response + + with patch("capiscio_mcp._core.client.CoreClient") as mock_client_class: + mock_instance = AsyncMock() + mock_instance._channel = MagicMock() + mock_client_class.get_instance = AsyncMock(return_value=mock_instance) + + with patch.dict("sys.modules", { + "capiscio_mcp._proto.gen.capiscio.v1.simpleguard_pb2": MagicMock(), + "capiscio_mcp._proto.gen.capiscio.v1.simpleguard_pb2_grpc": MagicMock( + SimpleGuardServiceStub=MagicMock(return_value=mock_stub) + ), + "capiscio_mcp._proto.gen.capiscio.v1.trust_pb2": MagicMock(), + }): + result = await generate_server_keypair(output_dir=str(tmp_path)) + + # Check result includes path + assert "private_key_path" in result + + # Verify file was created (mocked, but path should exist in result) + assert result["private_key_path"].endswith(".pem") + + +class TestSetupServerIdentityErrors: + """Tests for setup_server_identity error scenarios.""" + + @pytest.mark.asyncio + async def test_setup_fails_on_key_generation_error(self): + """setup_server_identity should fail if key generation fails.""" + from capiscio_mcp.registration import ( + setup_server_identity, + KeyGenerationError, + ) + + mock_stub = AsyncMock() + mock_stub.GenerateKeyPair.side_effect = Exception("gRPC unavailable") + + with patch("capiscio_mcp._core.client.CoreClient") as mock_client_class: + mock_instance = AsyncMock() + mock_instance._channel = MagicMock() + mock_client_class.get_instance = AsyncMock(return_value=mock_instance) + + with patch.dict("sys.modules", { + "capiscio_mcp._proto.gen.capiscio.v1.simpleguard_pb2": MagicMock(), + "capiscio_mcp._proto.gen.capiscio.v1.simpleguard_pb2_grpc": MagicMock( + SimpleGuardServiceStub=MagicMock(return_value=mock_stub) + ), + "capiscio_mcp._proto.gen.capiscio.v1.trust_pb2": MagicMock(), + }): + with pytest.raises(KeyGenerationError): + await setup_server_identity( + server_id="fail-test", + api_key="key", + ca_url="https://api.capisc.io", + ) + + @pytest.mark.asyncio + async def test_setup_fails_on_registration_error(self): + """setup_server_identity should fail if registration fails.""" + from capiscio_mcp.registration import ( + setup_server_identity, + RegistrationError, + ) + + # Mock successful key generation + mock_key_response = MagicMock() + mock_key_response.key_id = "gen-key" + mock_key_response.public_key_pem = "-----BEGIN PUBLIC KEY-----\ntest" + mock_key_response.private_key_pem = "-----BEGIN PRIVATE KEY-----\ntest" + mock_key_response.did_key = "did:key:z6MkGenKey" + mock_key_response.error_message = "" + + mock_stub = AsyncMock() + mock_stub.GenerateKeyPair.return_value = mock_key_response + + with patch("capiscio_mcp._core.client.CoreClient") as mock_client_class: + mock_instance = AsyncMock() + mock_instance._channel = MagicMock() + mock_client_class.get_instance = AsyncMock(return_value=mock_instance) + + with patch.dict("sys.modules", { + "capiscio_mcp._proto.gen.capiscio.v1.simpleguard_pb2": MagicMock(), + "capiscio_mcp._proto.gen.capiscio.v1.simpleguard_pb2_grpc": MagicMock( + SimpleGuardServiceStub=MagicMock(return_value=mock_stub) + ), + "capiscio_mcp._proto.gen.capiscio.v1.trust_pb2": MagicMock(), + }): + # Mock failing registration + with patch("capiscio_mcp.registration.requests.put") as mock_put: + mock_response = MagicMock() + mock_response.status_code = 500 + mock_put.return_value = mock_response + + with pytest.raises(RegistrationError): + await setup_server_identity( + server_id="fail-reg-test", + api_key="key", + ca_url="https://api.capisc.io", + ) From 55573981acd26db2ad8f8bb65a1b99620ce94c31 Mon Sep 17 00:00:00 2001 From: Beon de Nood Date: Sat, 17 Jan 2026 19:30:27 -0500 Subject: [PATCH 3/5] fix: integration-tests.yml use --version flag The capiscio CLI uses --version, not 'version' subcommand --- .github/workflows/integration-tests.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/integration-tests.yml b/.github/workflows/integration-tests.yml index 0a8aa09..1d27f88 100644 --- a/.github/workflows/integration-tests.yml +++ b/.github/workflows/integration-tests.yml @@ -54,7 +54,7 @@ jobs: chmod +x bin/capiscio echo "Built capiscio binary:" ls -la bin/capiscio - ./bin/capiscio version + ./bin/capiscio --version - name: Set up Python uses: actions/setup-python@v5 From 0231e8087adb58a98e1000b2f387744a6b07e0e2 Mon Sep 17 00:00:00 2001 From: Beon de Nood Date: Sat, 17 Jan 2026 19:45:40 -0500 Subject: [PATCH 4/5] fix(tests): remove broken ContextVar patches in guard tests The _caller_badge is a ContextVar, not a function, so patch() with return_value does not work. The mock RPC response already provides all needed data, so these patches were unnecessary. --- tests/test_guard.py | 125 +++++++++++++++++++++----------------------- 1 file changed, 59 insertions(+), 66 deletions(-) diff --git a/tests/test_guard.py b/tests/test_guard.py index 1995a08..1a77c42 100644 --- a/tests/test_guard.py +++ b/tests/test_guard.py @@ -210,13 +210,12 @@ async def test_guard_allows_valid_badge(self, mock_core_client, sample_badge_jws mock_core_client.stub.EvaluateToolAccess = AsyncMock(return_value=mock_response) with patch("capiscio_mcp._core.client.CoreClient.get_instance", return_value=mock_core_client): - with patch("capiscio_mcp.guard._caller_badge", return_value=sample_badge_jws): - @guard(min_trust_level=2) - async def read_file(path: str) -> str: - return f"Contents of {path}" - - result = await read_file(path="/tmp/test.txt") - assert result == "Contents of /tmp/test.txt" + @guard(min_trust_level=2) + async def read_file(path: str) -> str: + return f"Contents of {path}" + + result = await read_file(path="/tmp/test.txt") + assert result == "Contents of /tmp/test.txt" @pytest.mark.asyncio async def test_guard_denies_insufficient_trust(self, mock_core_client, sample_badge_jws): @@ -235,16 +234,15 @@ async def test_guard_denies_insufficient_trust(self, mock_core_client, sample_ba mock_core_client.stub.EvaluateToolAccess = AsyncMock(return_value=mock_response) with patch("capiscio_mcp._core.client.CoreClient.get_instance", return_value=mock_core_client): - with patch("capiscio_mcp.guard._caller_badge", return_value=sample_badge_jws): - @guard(min_trust_level=2) - async def read_file(path: str) -> str: - return f"Contents of {path}" - - with pytest.raises(GuardError) as exc_info: - await read_file(path="/tmp/test.txt") - - assert exc_info.value.reason == DenyReason.TRUST_INSUFFICIENT - assert "evidence_id" in dir(exc_info.value) + @guard(min_trust_level=2) + async def read_file(path: str) -> str: + return f"Contents of {path}" + + with pytest.raises(GuardError) as exc_info: + await read_file(path="/tmp/test.txt") + + assert exc_info.value.reason == DenyReason.TRUST_INSUFFICIENT + assert "evidence_id" in dir(exc_info.value) @pytest.mark.asyncio async def test_guard_denies_missing_badge(self, mock_core_client): @@ -263,15 +261,14 @@ async def test_guard_denies_missing_badge(self, mock_core_client): mock_core_client.stub.EvaluateToolAccess = AsyncMock(return_value=mock_response) with patch("capiscio_mcp._core.client.CoreClient.get_instance", return_value=mock_core_client): - with patch("capiscio_mcp.guard._caller_badge", return_value=None): - @guard(min_trust_level=1) - async def read_file(path: str) -> str: - return f"Contents of {path}" - - with pytest.raises(GuardError) as exc_info: - await read_file(path="/tmp/test.txt") - - assert exc_info.value.reason == DenyReason.BADGE_MISSING + @guard(min_trust_level=1) + async def read_file(path: str) -> str: + return f"Contents of {path}" + + with pytest.raises(GuardError) as exc_info: + await read_file(path="/tmp/test.txt") + + assert exc_info.value.reason == DenyReason.BADGE_MISSING @pytest.mark.asyncio async def test_guard_uses_function_name_as_tool_name(self, mock_core_client, sample_badge_jws): @@ -290,18 +287,17 @@ async def test_guard_uses_function_name_as_tool_name(self, mock_core_client, sam mock_core_client.stub.EvaluateToolAccess = evaluate_mock with patch("capiscio_mcp._core.client.CoreClient.get_instance", return_value=mock_core_client): - with patch("capiscio_mcp.guard._caller_badge", return_value=sample_badge_jws): - @guard() - async def my_custom_tool(arg: str) -> str: - return arg - - await my_custom_tool(arg="test") - - # Verify tool_name was passed correctly - call_args = evaluate_mock.call_args - assert call_args is not None - request = call_args[0][0] - assert request.tool_name == "my_custom_tool" + @guard() + async def my_custom_tool(arg: str) -> str: + return arg + + await my_custom_tool(arg="test") + + # Verify tool_name was passed correctly + call_args = evaluate_mock.call_args + assert call_args is not None + request = call_args[0][0] + assert request.tool_name == "my_custom_tool" @pytest.mark.asyncio async def test_guard_custom_tool_name(self, mock_core_client, sample_badge_jws): @@ -320,16 +316,15 @@ async def test_guard_custom_tool_name(self, mock_core_client, sample_badge_jws): mock_core_client.stub.EvaluateToolAccess = evaluate_mock with patch("capiscio_mcp._core.client.CoreClient.get_instance", return_value=mock_core_client): - with patch("capiscio_mcp.guard._caller_badge", return_value=sample_badge_jws): - @guard(tool_name="filesystem.read") - async def read_file(path: str) -> str: - return f"Contents of {path}" - - await read_file(path="/tmp/test.txt") - - call_args = evaluate_mock.call_args - request = call_args[0][0] - assert request.tool_name == "filesystem.read" + @guard(tool_name="filesystem.read") + async def read_file(path: str) -> str: + return f"Contents of {path}" + + await read_file(path="/tmp/test.txt") + + call_args = evaluate_mock.call_args + request = call_args[0][0] + assert request.tool_name == "filesystem.read" @pytest.mark.asyncio async def test_guard_computes_params_hash(self, mock_core_client, sample_badge_jws): @@ -348,16 +343,15 @@ async def test_guard_computes_params_hash(self, mock_core_client, sample_badge_j mock_core_client.stub.EvaluateToolAccess = evaluate_mock with patch("capiscio_mcp._core.client.CoreClient.get_instance", return_value=mock_core_client): - with patch("capiscio_mcp.guard._caller_badge", return_value=sample_badge_jws): - @guard() - async def query_db(sql: str, limit: int) -> list: - return [] - - await query_db(sql="SELECT * FROM users", limit=10) - - call_args = evaluate_mock.call_args - request = call_args[0][0] - assert request.params_hash.startswith("sha256:") + @guard() + async def query_db(sql: str, limit: int) -> list: + return [] + + await query_db(sql="SELECT * FROM users", limit=10) + + call_args = evaluate_mock.call_args + request = call_args[0][0] + assert request.params_hash.startswith("sha256:") class TestGuardSyncDecorator: @@ -384,14 +378,13 @@ async def mock_get_instance(): return mock_core_client with patch("capiscio_mcp._core.client.CoreClient.get_instance", mock_get_instance): - with patch("capiscio_mcp.guard._caller_badge", return_value=sample_badge_jws): - @guard_sync(min_trust_level=1) - def sync_read_file(path: str) -> str: - return f"Contents of {path}" - - # This should run without asyncio - result = sync_read_file(path="/tmp/test.txt") - assert result == "Contents of /tmp/test.txt" + @guard_sync(min_trust_level=1) + def sync_read_file(path: str) -> str: + return f"Contents of {path}" + + # This should run without asyncio + result = sync_read_file(path="/tmp/test.txt") + assert result == "Contents of /tmp/test.txt" class TestContextVariables: From 4105429efcc4461b6129a5f8eccdeb948e096a4f Mon Sep 17 00:00:00 2001 From: Beon de Nood Date: Sat, 17 Jan 2026 19:52:59 -0500 Subject: [PATCH 5/5] fix(tests): clear env vars in ensure_binary test to ensure download path The test_calls_download_binary test was failing because the CI environment has CAPISCIO_BINARY_PATH set, which causes ensure_binary to return early without calling download_binary. The fix clears the environment dictionary to ensure the download path is taken. --- tests/test_core_lifecycle.py | 16 +++++++++------- 1 file changed, 9 insertions(+), 7 deletions(-) diff --git a/tests/test_core_lifecycle.py b/tests/test_core_lifecycle.py index 899f481..b369c69 100644 --- a/tests/test_core_lifecycle.py +++ b/tests/test_core_lifecycle.py @@ -235,13 +235,15 @@ async def test_returns_path(self): @pytest.mark.asyncio async def test_calls_download_binary(self): - """Should call download_binary in executor.""" - with patch("capiscio_mcp._core.lifecycle.download_binary") as mock_download: - mock_download.return_value = Path("/tmp/capiscio") - - await ensure_binary() - - mock_download.assert_called_once() + """Should call download_binary in executor when no local override.""" + # Clear any CAPISCIO_BINARY_PATH env var to ensure download path is taken + with patch.dict(os.environ, {}, clear=True): + with patch("capiscio_mcp._core.lifecycle.download_binary") as mock_download: + mock_download.return_value = Path("/tmp/capiscio") + + await ensure_binary() + + mock_download.assert_called_once() class TestStartCoreProcess: