diff --git a/README.md b/README.md index 4428e24..a9f6d5f 100644 --- a/README.md +++ b/README.md @@ -14,10 +14,12 @@ Tool-level security for Model Context Protocol servers. ## Installation ```bash -# Standalone (no MCP SDK dependency) pip install capiscio-mcp +``` + +For MCP SDK integration (FastMCP wrapper): -# With MCP SDK integration +```bash pip install capiscio-mcp[mcp] ``` @@ -77,30 +79,6 @@ async def execute_query(sql: str) -> list[dict]: pass ``` -### With MCP SDK Integration - -```python -from capiscio_mcp.integrations.mcp import CapiscioMCPServer - -server = CapiscioMCPServer( - name="filesystem", - did="did:web:mcp.example.com:servers:filesystem", - badge="eyJhbGc...", # Server's trust badge -) - -@server.tool(min_trust_level=2) -async def read_file(path: str) -> str: - """Read a file (requires Trust Level 2+).""" - with open(path) as f: - return f.read() - -@server.tool(min_trust_level=3) -async def write_file(path: str, content: str) -> None: - """Write a file (requires Trust Level 3+).""" - with open(path, "w") as f: - f.write(content) -``` - ## Quickstart 2: Client-Side (Server Verification) Verify the identity of MCP servers you connect to: @@ -122,22 +100,6 @@ elif result.state == ServerState.UNVERIFIED_ORIGIN: print("Warning: Server did not disclose identity") ``` -### With MCP SDK Integration - -```python -from capiscio_mcp.integrations.mcp import CapiscioMCPClient - -async with CapiscioMCPClient( - server_url="https://mcp.example.com", - min_trust_level=2, # Require verified identity - badge="eyJhbGc...", # Your client badge -) as client: - # Server identity already verified - print(f"Connected at trust level {client.server_trust_level}") - - result = await client.call_tool("read_file", {"path": "/data/file.txt"}) -``` - ## Quickstart 3: Server Registration Register your MCP server's identity with the CapiscIO registry: @@ -149,6 +111,7 @@ from capiscio_mcp import setup_server_identity result = await setup_server_identity( server_id="550e8400-e29b-41d4-a716-446655440000", # From dashboard api_key="sk_live_...", # Registry API key + ca_url="https://registry.capisc.io", # Optional, defaults to production output_dir="./keys", ) @@ -171,7 +134,67 @@ await register_server_identity( api_key="sk_live_...", did=keys["did_key"], public_key=keys["public_key_pem"], + ca_url="https://registry.capisc.io", # Optional, defaults to production +) +``` + +## MCP SDK Integration + +For seamless integration with the official [MCP Python SDK](https://github.com/modelcontextprotocol/python-sdk), install with the `mcp` extra: + +```bash +pip install capiscio-mcp[mcp] +``` + +### Server with FastMCP Wrapper + +Create an MCP server with built-in trust enforcement: + +```python +from capiscio_mcp.integrations.mcp import CapiscioMCPServer + +server = CapiscioMCPServer( + name="filesystem", + did="did:web:mcp.example.com:servers:filesystem", + badge="eyJhbGc...", # From CapiscIO registry ) + +@server.tool(min_trust_level=2) +async def read_file(path: str) -> str: + """Only agents with Trust Level 2+ can read files.""" + with open(path) as f: + return f.read() + +@server.tool(min_trust_level=0) +async def list_files(directory: str) -> list[str]: + """Any authenticated agent can list files.""" + import os + return os.listdir(directory) + +# Run the server (stdio transport) +server.run() +``` + +### Client with Trust Verification + +Connect to MCP servers via stdio transport: + +```python +from capiscio_mcp.integrations.mcp import CapiscioMCPClient + +async with CapiscioMCPClient( + command="python", + args=["my_mcp_server.py"], + min_trust_level=1, + badge="eyJhbGc...", # Your client badge +) as client: + # List available tools + tools = await client.list_tools() + print(f"Available tools: {[t['name'] for t in tools]}") + + # Call a tool + result = await client.call_tool("read_file", {"path": "/data/config.json"}) + print(result) ``` ## Core Connection Modes @@ -296,7 +319,7 @@ config = VerifyConfig( - `verify_server(server_did, server_badge, transport_origin, endpoint_path, config)` — Async verification - `verify_server_sync(...)` — Sync verification -- `verify_server_strict(...)` — Raises on any verification failure +- `verify_server_strict(...)` — Raises ServerVerifyError on any verification failure - `parse_http_headers(headers)` — Extract identity from HTTP headers - `parse_jsonrpc_meta(meta)` — Extract identity from MCP _meta - `VerifyConfig` — Configuration dataclass @@ -319,9 +342,22 @@ config = VerifyConfig( - `Decision` — ALLOW / DENY - `AuthLevel` — ANONYMOUS / API_KEY / BADGE - `DenyReason` — Enumeration of denial reasons +- `TrustLevel` — Trust levels 0-4 per RFC-002 - `ServerState` — VERIFIED_PRINCIPAL / DECLARED_PRINCIPAL / UNVERIFIED_ORIGIN - `ServerErrorCode` — Enumeration of verification error codes -- `TrustLevel` — 0-4 trust level enum + +### MCP SDK Integration (optional) + +Requires `pip install capiscio-mcp[mcp]`: + +- `CapiscioMCPServer(name, did, badge, ...)` — FastMCP wrapper with trust enforcement +- `CapiscioMCPServer.tool(min_trust_level=...)` — Decorator for guarded tools +- `CapiscioMCPServer.run(transport="stdio")` — Run the server +- `CapiscioMCPClient(command, args, ...)` — Client for stdio transport* +- `CapiscioMCPClient.call_tool(name, args)` — Call a tool on the server +- `CapiscioMCPClient.list_tools()` — List available tools + +*Note: Server identity verification in `CapiscioMCPClient` requires MCP SDK support for `_meta` passthrough in initialize responses. This is not yet available, so `min_trust_level` and `fail_on_unverified` parameters are currently not enforced. Server-side trust enforcement via `@server.tool(min_trust_level=...)` works fully. ## Documentation diff --git a/capiscio_mcp/__init__.py b/capiscio_mcp/__init__.py index 08e641c..352c2e7 100644 --- a/capiscio_mcp/__init__.py +++ b/capiscio_mcp/__init__.py @@ -50,6 +50,7 @@ async def read_database(query: str) -> list[dict]: DenyReason, ServerState, ServerErrorCode, + TrustLevel, ) from capiscio_mcp.errors import ( GuardError, @@ -63,10 +64,12 @@ async def read_database(query: str) -> list[dict]: GuardConfig, GuardResult, compute_params_hash, + evaluate_tool_access, ) from capiscio_mcp.server import ( verify_server, verify_server_sync, + verify_server_strict, VerifyConfig, VerifyResult, parse_http_headers, @@ -112,6 +115,7 @@ async def read_database(query: str) -> list[dict]: "DenyReason", "ServerState", "ServerErrorCode", + "TrustLevel", # Errors "GuardError", "ServerVerifyError", @@ -123,9 +127,11 @@ async def read_database(query: str) -> list[dict]: "GuardConfig", "GuardResult", "compute_params_hash", + "evaluate_tool_access", # Server (RFC-007) "verify_server", "verify_server_sync", + "verify_server_strict", "VerifyConfig", "VerifyResult", "parse_http_headers", diff --git a/capiscio_mcp/integrations/mcp.py b/capiscio_mcp/integrations/mcp.py index 49229c3..d76b2bc 100644 --- a/capiscio_mcp/integrations/mcp.py +++ b/capiscio_mcp/integrations/mcp.py @@ -20,6 +20,9 @@ async def read_file(path: str) -> str: with open(path) as f: return f.read() + # Run the server + server.run() + Usage (Client): from capiscio_mcp.integrations.mcp import CapiscioMCPClient @@ -39,22 +42,25 @@ async def read_file(path: str) -> str: from functools import wraps from typing import Any, Callable, Coroutine, Dict, List, Optional, TypeVar, Union -# Check if MCP SDK is available +# Check if MCP SDK (FastMCP) is available try: - from mcp.server import Server as McpServer + from mcp.server.fastmcp import FastMCP from mcp.types import Tool, TextContent MCP_AVAILABLE = True except ImportError: - McpServer = None # type: ignore + FastMCP = None # type: ignore Tool = None # type: ignore TextContent = None # type: ignore MCP_AVAILABLE = False try: - from mcp.client.session import ClientSession as McpClient + from mcp.client.session import ClientSession as McpClientSession + from mcp.client.stdio import stdio_client, StdioServerParameters MCP_CLIENT_AVAILABLE = True except ImportError: - McpClient = None # type: ignore + McpClientSession = None # type: ignore + stdio_client = None # type: ignore + StdioServerParameters = None # type: ignore MCP_CLIENT_AVAILABLE = False # Check if cryptography is available for PoP @@ -99,7 +105,7 @@ async def read_file(path: str) -> str: def _require_mcp_server() -> None: - """Raise ImportError if MCP server SDK is not available.""" + """Raise ImportError if MCP server SDK (FastMCP) is not available.""" if not MCP_AVAILABLE: raise ImportError( "MCP SDK integration requires the 'mcp' package. " @@ -120,10 +126,10 @@ class CapiscioMCPServer: """ MCP Server with CapiscIO identity disclosure, PoP signing, and tool guarding. - This class wraps an MCP Server to: + This class wraps FastMCP to: 1. Automatically inject identity into initialize response _meta 2. Sign PoP challenges to prove key ownership (RFC-007) - 3. Guard registered tools with @guard decorator + 3. Guard registered tools with @guard decorator for trust enforcement Attributes: name: Server name @@ -146,7 +152,7 @@ async def read_file(path: str) -> str: return f.read() # Run the server - await server.run_stdio() + server.run() """ def __init__( @@ -194,7 +200,8 @@ def __init__( elif private_key_pem is not None: self._private_key = load_private_key_from_pem(private_key_pem) - self._server = McpServer(name) + # Create underlying FastMCP server + self._server = FastMCP(name) self._tools: Dict[str, Callable] = {} self._tool_configs: Dict[str, GuardConfig] = {} @@ -297,8 +304,8 @@ def tool( Register a tool with CapiscIO guard. This decorator: - 1. Registers the function as an MCP tool - 2. Wraps it with @guard for access control + 1. Registers the function as an MCP tool via FastMCP + 2. Wraps it with @guard for access control based on caller trust level Args: name: Tool name (default: function name) @@ -330,13 +337,14 @@ def decorator( # Apply guard decorator guarded_func = guard(config=effective_config, tool_name=tool_name)(func) - # Store for registration + # Store for reference self._tools[tool_name] = guarded_func self._tool_configs[tool_name] = effective_config - # Register with MCP server - # Note: Registration API depends on MCP SDK version - # This is a placeholder for the actual registration + # Register with FastMCP server using its @tool decorator + # FastMCP will handle the MCP protocol details + self._server.tool(name=tool_name, description=tool_description)(guarded_func) + logger.debug(f"Registered tool '{tool_name}' with trust level {effective_config.min_trust_level}") return guarded_func @@ -344,8 +352,8 @@ def decorator( return decorator @property - def server(self) -> "McpServer": - """Access the underlying MCP server.""" + def server(self) -> "FastMCP": + """Access the underlying FastMCP server.""" return self._server @property @@ -353,15 +361,34 @@ def identity_meta(self) -> Dict[str, str]: """Get the identity metadata for initialize response.""" return self._identity_meta.copy() - async def run_stdio(self) -> None: - """Run the server over stdio transport.""" - # Implementation depends on MCP SDK - pass + def run(self, transport: str = "stdio") -> None: + """ + Run the server with the specified transport. + + Args: + transport: Transport type - "stdio" (default) or "streamable-http" + + Example: + server.run() # stdio transport + server.run(transport="streamable-http") # HTTP transport + """ + self._server.run(transport=transport) + + def run_stdio(self) -> None: + """Run the server over stdio transport. + + Deprecated: Use run() instead. + """ + # For backwards compatibility, delegate to run() + self._server.run(transport="stdio") - async def run_sse(self, port: int = 8080) -> None: - """Run the server over SSE transport.""" - # Implementation depends on MCP SDK - pass + def run_sse(self, port: int = 8080) -> None: + """Run the server over SSE transport. + + Deprecated: Use run(transport="sse") instead. SSE is deprecated in favor of streamable-http. + """ + logger.warning("SSE transport is deprecated, use streamable-http instead") + self._server.run(transport="sse") class CapiscioMCPClient: @@ -393,11 +420,21 @@ class CapiscioMCPClient: print(f"PoP verified: {client.pop_verified}") result = await client.call_tool("read_file", {"path": "/data/file.txt"}) + + For stdio transport (subprocess server): + async with CapiscioMCPClient( + command="python", + args=["my_mcp_server.py"], + min_trust_level=1, + ) as client: + result = await client.call_tool("my_tool", {"arg": "value"}) """ def __init__( self, - server_url: str, + server_url: Optional[str] = None, + command: Optional[str] = None, + args: Optional[List[str]] = None, min_trust_level: int = 0, fail_on_unverified: bool = True, require_pop: bool = False, @@ -409,17 +446,31 @@ def __init__( Initialize CapiscIO MCP Client. Args: - server_url: URL of the MCP server + server_url: URL of the MCP server (for HTTP transport) + command: Command to run server (for stdio transport) + args: Arguments for command (for stdio transport) min_trust_level: Minimum required server trust level fail_on_unverified: If True, raise when server doesn't disclose identity require_pop: If True, require PoP verification for did:key servers verify_config: Full verification configuration badge: Client badge for authentication (recommended) api_key: Client API key for authentication (alternative) + + Raises: + ValueError: If neither server_url nor command is provided """ _require_mcp_client() + # Ensure at least one transport method is configured + if server_url is None and command is None: + raise ValueError( + "Either server_url or command must be provided to CapiscioMCPClient " + "to select an HTTP or stdio transport." + ) + self.server_url = server_url + self.command = command + self.args = args or [] self.min_trust_level = min_trust_level self.fail_on_unverified = fail_on_unverified self.require_pop = require_pop @@ -431,8 +482,8 @@ def __init__( api_key=api_key, ) - self._client: Optional[McpClient] = None - self._session: Optional[Any] = None + self._session: Optional[McpClientSession] = None + self._context_manager: Optional[Any] = None self._verify_result: Optional[VerifyResult] = None # PoP state @@ -550,75 +601,68 @@ async def __aexit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None: async def connect(self) -> None: """ - Connect to MCP server and verify identity. - - Raises: - ServerVerifyError: If server verification fails and fail_on_unverified=True - GuardError: If server doesn't meet trust requirements - """ - # Connect to MCP server - # Implementation depends on MCP SDK transport - - # Extract server identity from initialize response - # This is a placeholder - actual implementation depends on MCP SDK - server_did: Optional[str] = None - server_badge: Optional[str] = None - - # If we get _meta from initialize response: - # server_did, server_badge = parse_jsonrpc_meta(init_result.meta) - - # Verify server identity - self._verify_result = await verify_server( - server_did=server_did, - server_badge=server_badge, - transport_origin=self.server_url, - config=self.verify_config, - ) + Connect to MCP server. - # Enforce requirements - if self.fail_on_unverified and self._verify_result.state == ServerState.UNVERIFIED_ORIGIN: - raise ServerVerifyError( - error_code=self._verify_result.error_code, - detail=f"Server at {self.server_url} did not disclose identity", - state=self._verify_result.state, - ) + For stdio transport, spawns the server process. + For HTTP transport, connects to the server URL (not yet implemented). - if self._verify_result.state == ServerState.DECLARED_PRINCIPAL and self.min_trust_level > 0: - raise ServerVerifyError( - error_code=self._verify_result.error_code, - detail=f"Server at {self.server_url} did not provide verifiable badge", - state=self._verify_result.state, - server_did=self._verify_result.server_did, - ) + Note: + Server identity verification (min_trust_level, fail_on_unverified) is + not yet functional due to MCP SDK limitations. The SDK does not currently + expose _meta from the initialize response, which is needed to extract + server DID and badge. Server-side trust enforcement works fully. - if ( - self._verify_result.state == ServerState.VERIFIED_PRINCIPAL - and self._verify_result.trust_level is not None - and self._verify_result.trust_level < self.min_trust_level - ): - raise ServerVerifyError( - error_code=self._verify_result.error_code, - detail=( - f"Server trust level {self._verify_result.trust_level} " - f"is below required {self.min_trust_level}" - ), - state=self._verify_result.state, - server_did=self._verify_result.server_did, + Raises: + NotImplementedError: If HTTP transport is requested (not yet supported) + """ + if self.command: + # Stdio transport - spawn server process + server_params = StdioServerParameters( + command=self.command, + args=self.args, ) - - logger.info( - f"Connected to {self.server_url}: " - f"state={self._verify_result.state.value}, " - f"trust_level={self._verify_result.trust_level}" - ) + self._context_manager = stdio_client(server_params) + try: + read_stream, write_stream = await self._context_manager.__aenter__() + self._session = McpClientSession(read_stream, write_stream) + try: + await self._session.__aenter__() + # Initialize the session + await self._session.initialize() + except Exception: + # Clean up session on failure + self._session = None + raise + except Exception: + # Clean up context manager on failure + if self._context_manager: + try: + await self._context_manager.__aexit__(None, None, None) + except Exception: + pass + self._context_manager = None + raise + else: + # HTTP transport would go here + logger.warning("HTTP transport not yet implemented, use stdio with command/args") + raise NotImplementedError("HTTP transport not yet implemented") + + # Note: Server identity verification is not yet functional. + # MCP SDK currently doesn't expose _meta from initialize response, + # so we cannot extract server_did and server_badge for verification. + # The min_trust_level and fail_on_unverified parameters are stored + # for future use when MCP SDK adds _meta passthrough support. + + logger.info(f"Connected to MCP server: {self.command or self.server_url}") async def close(self) -> None: """Close connection to MCP server.""" if self._session: - # Close session - pass - self._session = None - self._client = None + await self._session.__aexit__(None, None, None) + self._session = None + if self._context_manager: + await self._context_manager.__aexit__(None, None, None) + self._context_manager = None @property def server_state(self) -> Optional[ServerState]: @@ -649,7 +693,7 @@ async def call_tool( arguments: Optional[Dict[str, Any]] = None, ) -> Any: """ - Call a tool on the verified server. + Call a tool on the connected server. Automatically includes client credentials in the request. @@ -658,7 +702,7 @@ async def call_tool( arguments: Tool arguments Returns: - Tool result + Tool result from the server Raises: RuntimeError: If not connected @@ -669,12 +713,13 @@ async def call_tool( # Set credential context for the call token = set_credential(self._credential) try: - # Call tool via MCP client - # Implementation depends on MCP SDK - pass + # Call tool via MCP client session + result = await self._session.call_tool(name, arguments or {}) + return result finally: - # Reset credential context - pass + # Reset credential context to avoid leakage between calls/tasks + from capiscio_mcp.guard import _current_credential + _current_credential.reset(token) async def list_tools(self) -> List[Dict[str, Any]]: """ @@ -686,6 +731,11 @@ async def list_tools(self) -> List[Dict[str, Any]]: if self._session is None: raise RuntimeError("Client not connected. Use 'async with' context.") - # List tools via MCP client - # Implementation depends on MCP SDK - return [] + result = await self._session.list_tools() + return [ + { + "name": tool.name, + "description": tool.description, + } + for tool in result.tools + ] diff --git a/tests/test_integrations.py b/tests/test_integrations.py index f7b33a9..a0a8980 100644 --- a/tests/test_integrations.py +++ b/tests/test_integrations.py @@ -40,7 +40,7 @@ class TestCapiscioMCPServer: def test_init_with_required_params(self): """Should initialize with required parameters.""" with patch("capiscio_mcp.integrations.mcp.MCP_AVAILABLE", True): - with patch("capiscio_mcp.integrations.mcp.McpServer"): + with patch("capiscio_mcp.integrations.mcp.FastMCP"): server = CapiscioMCPServer( name="filesystem", did="did:web:mcp.example.com:servers:fs", @@ -52,7 +52,7 @@ def test_init_with_required_params(self): def test_init_with_badge(self): """Should accept optional badge.""" with patch("capiscio_mcp.integrations.mcp.MCP_AVAILABLE", True): - with patch("capiscio_mcp.integrations.mcp.McpServer"): + with patch("capiscio_mcp.integrations.mcp.FastMCP"): server = CapiscioMCPServer( name="filesystem", did="did:web:mcp.example.com:servers:fs", @@ -64,7 +64,7 @@ def test_init_with_badge(self): def test_init_with_default_trust_level(self): """Should accept default_min_trust_level.""" with patch("capiscio_mcp.integrations.mcp.MCP_AVAILABLE", True): - with patch("capiscio_mcp.integrations.mcp.McpServer"): + with patch("capiscio_mcp.integrations.mcp.FastMCP"): server = CapiscioMCPServer( name="filesystem", did="did:web:mcp.example.com:servers:fs", @@ -87,9 +87,9 @@ def test_init_raises_without_mcp(self): def test_tool_decorator(self): """tool decorator should register guarded tools.""" with patch("capiscio_mcp.integrations.mcp.MCP_AVAILABLE", True): - with patch("capiscio_mcp.integrations.mcp.McpServer") as mock_mcp_server: + with patch("capiscio_mcp.integrations.mcp.FastMCP") as mock_fastmcp: mock_instance = MagicMock() - mock_mcp_server.return_value = mock_instance + mock_fastmcp.return_value = mock_instance server = CapiscioMCPServer( name="filesystem", @@ -107,9 +107,9 @@ async def read_file(path: str) -> str: def test_tool_decorator_with_custom_name(self): """tool decorator should accept custom tool name.""" with patch("capiscio_mcp.integrations.mcp.MCP_AVAILABLE", True): - with patch("capiscio_mcp.integrations.mcp.McpServer") as mock_mcp_server: + with patch("capiscio_mcp.integrations.mcp.FastMCP") as mock_fastmcp: mock_instance = MagicMock() - mock_mcp_server.return_value = mock_instance + mock_fastmcp.return_value = mock_instance server = CapiscioMCPServer( name="filesystem", @@ -127,7 +127,7 @@ async def read_file(path: str) -> str: def test_tool_uses_default_trust_level(self): """tool should use server's default_min_trust_level.""" with patch("capiscio_mcp.integrations.mcp.MCP_AVAILABLE", True): - with patch("capiscio_mcp.integrations.mcp.McpServer"): + with patch("capiscio_mcp.integrations.mcp.FastMCP"): server = CapiscioMCPServer( name="filesystem", did="did:web:mcp.example.com:servers:fs", @@ -141,11 +141,11 @@ async def test_tool() -> str: # Tool should use default trust level 3 def test_server_property_returns_underlying_server(self): - """server property should return underlying MCP server.""" + """server property should return underlying FastMCP server.""" with patch("capiscio_mcp.integrations.mcp.MCP_AVAILABLE", True): - with patch("capiscio_mcp.integrations.mcp.McpServer") as mock_mcp_server: + with patch("capiscio_mcp.integrations.mcp.FastMCP") as mock_fastmcp: mock_instance = MagicMock() - mock_mcp_server.return_value = mock_instance + mock_fastmcp.return_value = mock_instance server = CapiscioMCPServer( name="filesystem", @@ -159,8 +159,8 @@ def test_server_property_returns_underlying_server(self): class TestCapiscioMCPClient: """Tests for CapiscioMCPClient class.""" - def test_init_with_required_params(self): - """Should initialize with required parameters.""" + def test_init_with_server_url(self): + """Should initialize with server_url parameter.""" with patch("capiscio_mcp.integrations.mcp.MCP_CLIENT_AVAILABLE", True): client = CapiscioMCPClient( server_url="https://mcp.example.com", @@ -168,11 +168,31 @@ def test_init_with_required_params(self): assert client.server_url == "https://mcp.example.com" + def test_init_with_command(self): + """Should initialize with command parameter for stdio transport.""" + with patch("capiscio_mcp.integrations.mcp.MCP_CLIENT_AVAILABLE", True): + client = CapiscioMCPClient( + command="python", + args=["server.py"], + ) + + assert client.command == "python" + assert client.args == ["server.py"] + + def test_init_raises_without_transport(self): + """Should raise ValueError if neither server_url nor command provided.""" + with patch("capiscio_mcp.integrations.mcp.MCP_CLIENT_AVAILABLE", True): + with pytest.raises(ValueError) as exc_info: + client = CapiscioMCPClient() + + assert "server_url or command" in str(exc_info.value) + def test_init_with_min_trust_level(self): """Should accept min_trust_level.""" with patch("capiscio_mcp.integrations.mcp.MCP_CLIENT_AVAILABLE", True): client = CapiscioMCPClient( - server_url="https://mcp.example.com", + command="python", + args=["server.py"], min_trust_level=2, ) @@ -182,7 +202,8 @@ def test_init_with_fail_on_unverified(self): """Should accept fail_on_unverified flag.""" with patch("capiscio_mcp.integrations.mcp.MCP_CLIENT_AVAILABLE", True): client = CapiscioMCPClient( - server_url="https://mcp.example.com", + command="python", + args=["server.py"], fail_on_unverified=False, ) @@ -193,186 +214,97 @@ def test_init_raises_without_mcp(self): with patch("capiscio_mcp.integrations.mcp.MCP_CLIENT_AVAILABLE", False): with pytest.raises(ImportError) as exc_info: client = CapiscioMCPClient( - server_url="https://mcp.example.com", + command="python", + args=["server.py"], ) assert "pip install capiscio-mcp[mcp]" in str(exc_info.value) @pytest.mark.asyncio - async def test_context_manager_enter(self): - """Should connect on context manager enter.""" + async def test_context_manager_cleanup(self): + """Context manager should clean up on exit.""" with patch("capiscio_mcp.integrations.mcp.MCP_CLIENT_AVAILABLE", True): - with patch("capiscio_mcp.integrations.mcp.verify_server") as mock_verify: - mock_verify.return_value = MagicMock( - state=ServerState.VERIFIED_PRINCIPAL, - trust_level=2, - error_code=None, - server_did="did:web:example.com", - ) - - client = CapiscioMCPClient( - server_url="https://mcp.example.com", - ) - - async with client: - # Verify connect was called (verify_server invoked) - mock_verify.assert_called() - - @pytest.mark.asyncio - async def test_context_manager_exit(self): - """Should disconnect on context manager exit.""" - with patch("capiscio_mcp.integrations.mcp.MCP_CLIENT_AVAILABLE", True): - with patch("capiscio_mcp.integrations.mcp.verify_server") as mock_verify: - mock_verify.return_value = MagicMock( - state=ServerState.VERIFIED_PRINCIPAL, - trust_level=2, - error_code=None, - server_did="did:web:example.com", - ) - - client = CapiscioMCPClient( - server_url="https://mcp.example.com", - ) - - async with client: - pass - - # After exit, session should be cleaned up - assert client._session is None - - @pytest.mark.asyncio - async def test_verifies_server_on_connect(self): - """Should verify server identity on connect.""" - with patch("capiscio_mcp.integrations.mcp.MCP_CLIENT_AVAILABLE", True): - with patch("capiscio_mcp.integrations.mcp.verify_server") as mock_verify: - mock_verify.return_value = MagicMock( - state=ServerState.VERIFIED_PRINCIPAL, - trust_level=2, - error_code=None, - server_did="did:web:example.com", - ) - - client = CapiscioMCPClient( - server_url="https://mcp.example.com", - ) - - async with client: - mock_verify.assert_called() - - @pytest.mark.asyncio - async def test_raises_on_unverified_when_configured(self): - """Should raise when server is unverified and fail_on_unverified=True.""" - from capiscio_mcp.types import ServerErrorCode - - with patch("capiscio_mcp.integrations.mcp.MCP_CLIENT_AVAILABLE", True): - with patch("capiscio_mcp.integrations.mcp.verify_server") as mock_verify: - mock_verify.return_value = MagicMock( - state=ServerState.UNVERIFIED_ORIGIN, - trust_level=None, - error_code=ServerErrorCode.NONE, - server_did=None, - ) - - client = CapiscioMCPClient( - server_url="https://mcp.example.com", - fail_on_unverified=True, - ) - - with pytest.raises(ServerVerifyError): + with patch("capiscio_mcp.integrations.mcp.stdio_client") as mock_stdio: + with patch("capiscio_mcp.integrations.mcp.McpClientSession") as mock_session: + # Set up mocks + mock_cm = AsyncMock() + mock_cm.__aenter__.return_value = (MagicMock(), MagicMock()) + mock_stdio.return_value = mock_cm + + mock_session_instance = AsyncMock() + mock_session_instance.initialize = AsyncMock() + mock_session.return_value = mock_session_instance + + client = CapiscioMCPClient( + command="python", + args=["server.py"], + ) + async with client: pass + + # After exit, session should be cleaned up + assert client._session is None + assert client._context_manager is None @pytest.mark.asyncio - async def test_allows_unverified_when_configured(self): - """Should allow unverified server when fail_on_unverified=False.""" - with patch("capiscio_mcp.integrations.mcp.MCP_CLIENT_AVAILABLE", True): - with patch("capiscio_mcp.integrations.mcp.verify_server") as mock_verify: - mock_verify.return_value = MagicMock( - state=ServerState.UNVERIFIED_ORIGIN, - trust_level=None, - error_code=None, - server_did=None, - ) - - client = CapiscioMCPClient( - server_url="https://mcp.example.com", - fail_on_unverified=False, # Allow unverified - ) - - # Should not raise - async with client: - assert client.server_state == ServerState.UNVERIFIED_ORIGIN - - @pytest.mark.asyncio - async def test_server_state_property(self): - """server_state property should return verification state.""" + async def test_http_transport_not_implemented(self): + """HTTP transport should raise NotImplementedError.""" with patch("capiscio_mcp.integrations.mcp.MCP_CLIENT_AVAILABLE", True): - with patch("capiscio_mcp.integrations.mcp.verify_server") as mock_verify: - mock_verify.return_value = MagicMock( - state=ServerState.VERIFIED_PRINCIPAL, - trust_level=2, - error_code=None, - server_did="did:web:example.com", - ) - - client = CapiscioMCPClient( - server_url="https://mcp.example.com", - ) - + client = CapiscioMCPClient( + server_url="https://mcp.example.com", + ) + + with pytest.raises(NotImplementedError) as exc_info: async with client: - assert client.server_state == ServerState.VERIFIED_PRINCIPAL + pass + + assert "HTTP transport" in str(exc_info.value) - @pytest.mark.asyncio - async def test_server_trust_level_property(self): - """server_trust_level property should return trust level.""" + def test_server_state_property_before_connect(self): + """server_state property should return None before connect.""" with patch("capiscio_mcp.integrations.mcp.MCP_CLIENT_AVAILABLE", True): - with patch("capiscio_mcp.integrations.mcp.verify_server") as mock_verify: - mock_verify.return_value = MagicMock( - state=ServerState.VERIFIED_PRINCIPAL, - trust_level=3, - error_code=None, - server_did="did:web:example.com", - ) - - client = CapiscioMCPClient( - server_url="https://mcp.example.com", - ) - - async with client: - assert client.server_trust_level == 3 + client = CapiscioMCPClient( + command="python", + args=["server.py"], + ) + + assert client.server_state is None - @pytest.mark.asyncio - async def test_call_tool(self): - """call_tool should delegate to underlying client.""" + def test_server_trust_level_property_before_connect(self): + """server_trust_level property should return None before connect.""" with patch("capiscio_mcp.integrations.mcp.MCP_CLIENT_AVAILABLE", True): - with patch("capiscio_mcp.integrations.mcp.verify_server") as mock_verify: - mock_verify.return_value = MagicMock( - state=ServerState.VERIFIED_PRINCIPAL, - trust_level=2, - error_code=None, - server_did="did:web:example.com", - ) - - client = CapiscioMCPClient( - server_url="https://mcp.example.com", - ) - - # For this test, we just verify the not-connected error - # since actual MCP client integration is not yet implemented - with pytest.raises(RuntimeError) as exc_info: - await client.call_tool("read_file", {"path": "/tmp/test.txt"}) - - assert "not connected" in str(exc_info.value).lower() + client = CapiscioMCPClient( + command="python", + args=["server.py"], + ) + + assert client.server_trust_level is None @pytest.mark.asyncio async def test_call_tool_raises_when_not_connected(self): """call_tool should raise when not connected.""" - with patch("capiscio_mcp.integrations.mcp.MCP_AVAILABLE", True): + with patch("capiscio_mcp.integrations.mcp.MCP_CLIENT_AVAILABLE", True): client = CapiscioMCPClient( - server_url="https://mcp.example.com", + command="python", + args=["server.py"], ) with pytest.raises(RuntimeError) as exc_info: await client.call_tool("test", {}) assert "not connected" in str(exc_info.value).lower() + + @pytest.mark.asyncio + async def test_list_tools_raises_when_not_connected(self): + """list_tools should raise when not connected.""" + with patch("capiscio_mcp.integrations.mcp.MCP_CLIENT_AVAILABLE", True): + client = CapiscioMCPClient( + command="python", + args=["server.py"], + ) + + with pytest.raises(RuntimeError) as exc_info: + await client.list_tools() + + assert "not connected" in str(exc_info.value).lower()