From 977ed971bcdad331db2f5054c4d7985137734ac9 Mon Sep 17 00:00:00 2001 From: Imran Siddique Date: Fri, 6 Feb 2026 13:45:40 -0800 Subject: [PATCH] [Agents] AgentMesh trust and governance layer Adds agentmesh module to semantic_kernel.agents providing: - CMVKIdentity: Cryptographic identity with Ed25519 keys - TrustedAgentCard: Agent discovery and verification - TrustHandshake: Peer verification protocol - GovernancePolicy: Rate limiting, capability control, auditing - GovernedAgent: Agent wrapper with governance enforcement - GovernanceKernel: Kernel wrapper with policy enforcement Features: - Rate limiting (per-minute and per-hour) - Function allow/deny lists - Resource limits (concurrent tasks, memory) - Full audit logging - Trust score thresholds - Policy violation tracking --- .../agents/agentmesh/README.md | 134 ++++++ .../agents/agentmesh/__init__.py | 35 ++ .../agents/agentmesh/governance.py | 443 ++++++++++++++++++ .../agents/agentmesh/identity.py | 146 ++++++ .../semantic_kernel/agents/agentmesh/trust.py | 179 +++++++ 5 files changed, 937 insertions(+) create mode 100644 python/semantic_kernel/agents/agentmesh/README.md create mode 100644 python/semantic_kernel/agents/agentmesh/__init__.py create mode 100644 python/semantic_kernel/agents/agentmesh/governance.py create mode 100644 python/semantic_kernel/agents/agentmesh/identity.py create mode 100644 python/semantic_kernel/agents/agentmesh/trust.py diff --git a/python/semantic_kernel/agents/agentmesh/README.md b/python/semantic_kernel/agents/agentmesh/README.md new file mode 100644 index 000000000000..c50874864f2c --- /dev/null +++ b/python/semantic_kernel/agents/agentmesh/README.md @@ -0,0 +1,134 @@ +# AgentMesh Integration for Semantic Kernel + +AgentMesh trust and governance layer integration for Semantic Kernel - enabling cryptographic identity verification and policy-based governance for AI agents. + +## Overview + +This integration provides: + +- **GovernedAgent**: Wrap agents with governance policies +- **GovernanceKernel**: Policy enforcement for kernel operations +- **Cryptographic Identity**: CMVK/Ed25519 based agent authentication +- **Trust Verification**: Multi-agent handshake protocols + +## Usage + +### Creating a Governed Agent + +```python +from semantic_kernel.agents.agentmesh import ( + CMVKIdentity, + GovernedAgent, + GovernancePolicy, +) + +# Generate cryptographic identity +identity = CMVKIdentity.generate( + agent_name="assistant-agent", + capabilities=["chat", "code_generation"] +) + +# Define governance policy +policy = GovernancePolicy( + max_requests_per_minute=30, + allowed_functions=["chat", "generate_code"], + audit_all_invocations=True, + require_trust_verification=True, +) + +# Create governed agent +governed_agent = GovernedAgent( + agent=base_agent, + identity=identity, + policy=policy, +) +``` + +### Governance Kernel + +```python +from semantic_kernel.agents.agentmesh import GovernanceKernel + +# Wrap kernel with governance +governed_kernel = GovernanceKernel( + kernel=kernel, + identity=identity, + policy=policy, +) + +# Invoke with governance checks +result = await governed_kernel.invoke_function( + "chat", + invoker_card=requester_card, + message="Hello", +) +``` + +### Trust Verification + +```python +from semantic_kernel.agents.agentmesh import TrustHandshake, TrustedAgentCard + +# Create agent card +card = TrustedAgentCard( + name="my-agent", + description="A helpful agent", + capabilities=["chat"], +) +card.sign(identity) + +# Verify peer before interaction +handshake = TrustHandshake(my_identity=identity) +result = handshake.verify_peer(peer_card) + +if result.trusted: + # Safe to interact + pass +``` + +## Features + +### GovernancePolicy + +| Setting | Description | Default | +|---------|-------------|---------| +| `max_requests_per_minute` | Rate limit per minute | 60 | +| `max_requests_per_hour` | Rate limit per hour | 1000 | +| `allowed_functions` | Whitelist of functions | None (all) | +| `denied_functions` | Blacklist of functions | None | +| `max_concurrent_tasks` | Concurrent task limit | 10 | +| `audit_all_invocations` | Audit every call | False | +| `require_trust_verification` | Require identity | True | +| `min_trust_score` | Minimum trust score | 0.7 | + +### Policy Violations + +The governance layer tracks violations: + +```python +violations = governed_kernel.get_violations() +for v in violations: + print(f"{v.violation_type}: {v.details}") +``` + +### Audit Logging + +```python +# Get audit log +audit_log = governed_kernel.get_audit_log() + +# Get summary +summary = governed_kernel.get_governance_summary() +print(f"Total violations: {summary['total_violations']}") +``` + +## Security Model + +Uses Ed25519 cryptography for: +- Agent identity generation +- Request signing +- Peer verification + +## License + +MIT License diff --git a/python/semantic_kernel/agents/agentmesh/__init__.py b/python/semantic_kernel/agents/agentmesh/__init__.py new file mode 100644 index 000000000000..d8e63002f90d --- /dev/null +++ b/python/semantic_kernel/agents/agentmesh/__init__.py @@ -0,0 +1,35 @@ +"""AgentMesh trust layer integration for Semantic Kernel. + +This module provides cryptographic identity and trust verification +for Semantic Kernel agents. +""" + +from semantic_kernel.agents.agentmesh.identity import CMVKIdentity, CMVKSignature +from semantic_kernel.agents.agentmesh.trust import ( + TrustedAgentCard, + TrustHandshake, + TrustVerificationResult, + TrustPolicy, +) +from semantic_kernel.agents.agentmesh.governance import ( + GovernedAgent, + GovernancePolicy, + PolicyViolation, + GovernanceKernel, +) + +__all__ = [ + # Identity + "CMVKIdentity", + "CMVKSignature", + # Trust + "TrustedAgentCard", + "TrustHandshake", + "TrustVerificationResult", + "TrustPolicy", + # Governance + "GovernedAgent", + "GovernancePolicy", + "PolicyViolation", + "GovernanceKernel", +] diff --git a/python/semantic_kernel/agents/agentmesh/governance.py b/python/semantic_kernel/agents/agentmesh/governance.py new file mode 100644 index 000000000000..9f968fb2a779 --- /dev/null +++ b/python/semantic_kernel/agents/agentmesh/governance.py @@ -0,0 +1,443 @@ +"""Agent-OS Governance layer for Semantic Kernel. + +This module provides governance controls for Semantic Kernel agents, +including policy enforcement, rate limiting, and audit logging. +""" + +from __future__ import annotations + +from dataclasses import dataclass, field +from datetime import datetime, timezone +from enum import Enum +from typing import Any, Callable, Dict, List, Optional, TYPE_CHECKING + +from semantic_kernel.agents.agentmesh.identity import CMVKIdentity +from semantic_kernel.agents.agentmesh.trust import ( + TrustHandshake, + TrustPolicy, + TrustedAgentCard, +) + +if TYPE_CHECKING: + from semantic_kernel.kernel import Kernel + from semantic_kernel.agents.agent import Agent + + +class ViolationType(Enum): + """Types of policy violations.""" + + RATE_LIMIT = "rate_limit" + CAPABILITY_DENIED = "capability_denied" + TRUST_FAILURE = "trust_failure" + RESOURCE_LIMIT = "resource_limit" + AUDIT_FAILURE = "audit_failure" + + +@dataclass +class PolicyViolation: + """Record of a governance policy violation.""" + + violation_type: ViolationType + timestamp: datetime + agent_did: Optional[str] + details: str + action_taken: str + metadata: Dict[str, Any] = field(default_factory=dict) + + +@dataclass +class GovernancePolicy: + """Governance policy configuration for agents. + + Controls what agents can do, how often, and what gets logged. + """ + + # Rate limiting + max_requests_per_minute: int = 60 + max_requests_per_hour: int = 1000 + max_tokens_per_request: int = 100000 + + # Capability restrictions + allowed_functions: Optional[List[str]] = None + denied_functions: Optional[List[str]] = None + allowed_plugins: Optional[List[str]] = None + + # Resource limits + max_concurrent_tasks: int = 10 + max_memory_mb: int = 1024 + timeout_seconds: int = 300 + + # Audit settings + audit_all_invocations: bool = False + audit_function_calls: bool = True + audit_failures: bool = True + log_input_output: bool = False + + # Trust requirements + require_trust_verification: bool = True + min_trust_score: float = 0.7 + block_unverified_agents: bool = True + + +@dataclass +class RateLimitState: + """Tracks rate limiting state.""" + + minute_window_start: datetime = field(default_factory=lambda: datetime.now(timezone.utc)) + hour_window_start: datetime = field(default_factory=lambda: datetime.now(timezone.utc)) + minute_count: int = 0 + hour_count: int = 0 + + +class GovernanceKernel: + """Governance wrapper for Semantic Kernel. + + Provides policy enforcement, rate limiting, and audit logging + around kernel operations. + """ + + def __init__( + self, + kernel: "Kernel", + identity: CMVKIdentity, + policy: Optional[GovernancePolicy] = None, + ): + """Initialize governance kernel. + + Args: + kernel: The Semantic Kernel instance to govern + identity: This kernel's identity + policy: Governance policy to enforce + """ + self._kernel = kernel + self._identity = identity + self._policy = policy or GovernancePolicy() + self._handshake = TrustHandshake( + identity, + TrustPolicy( + min_trust_score=self._policy.min_trust_score, + block_unverified=self._policy.block_unverified_agents, + ), + ) + self._violations: List[PolicyViolation] = [] + self._audit_log: List[Dict[str, Any]] = [] + self._rate_limits: Dict[str, RateLimitState] = {} + self._active_tasks: int = 0 + + @property + def kernel(self) -> "Kernel": + """Get the underlying kernel.""" + return self._kernel + + @property + def identity(self) -> CMVKIdentity: + """Get this kernel's identity.""" + return self._identity + + @property + def policy(self) -> GovernancePolicy: + """Get the governance policy.""" + return self._policy + + def _check_rate_limit(self, invoker_did: str) -> bool: + """Check if request is within rate limits. + + Args: + invoker_did: The invoker's DID + + Returns: + True if within limits, False otherwise + """ + now = datetime.now(timezone.utc) + + if invoker_did not in self._rate_limits: + self._rate_limits[invoker_did] = RateLimitState() + + state = self._rate_limits[invoker_did] + + # Reset minute window if needed + minute_elapsed = (now - state.minute_window_start).total_seconds() + if minute_elapsed >= 60: + state.minute_window_start = now + state.minute_count = 0 + + # Reset hour window if needed + hour_elapsed = (now - state.hour_window_start).total_seconds() + if hour_elapsed >= 3600: + state.hour_window_start = now + state.hour_count = 0 + + # Check limits + if state.minute_count >= self._policy.max_requests_per_minute: + return False + if state.hour_count >= self._policy.max_requests_per_hour: + return False + + # Increment counters + state.minute_count += 1 + state.hour_count += 1 + + return True + + def _check_function_allowed(self, function_name: str) -> bool: + """Check if function is allowed by policy. + + Args: + function_name: Name of the function to check + + Returns: + True if allowed, False otherwise + """ + if self._policy.denied_functions: + if function_name in self._policy.denied_functions: + return False + + if self._policy.allowed_functions: + if function_name not in self._policy.allowed_functions: + return False + + return True + + def _record_violation( + self, + violation_type: ViolationType, + agent_did: Optional[str], + details: str, + action: str, + ) -> PolicyViolation: + """Record a policy violation. + + Args: + violation_type: Type of violation + agent_did: The violating agent's DID + details: Description of violation + action: Action taken in response + + Returns: + The recorded violation + """ + violation = PolicyViolation( + violation_type=violation_type, + timestamp=datetime.now(timezone.utc), + agent_did=agent_did, + details=details, + action_taken=action, + ) + self._violations.append(violation) + return violation + + def _audit( + self, + event_type: str, + invoker_did: Optional[str], + function_name: Optional[str] = None, + success: bool = True, + details: Optional[Dict[str, Any]] = None, + ) -> None: + """Record an audit event. + + Args: + event_type: Type of event + invoker_did: Invoker's DID + function_name: Function being called + success: Whether operation succeeded + details: Additional details + """ + if not self._policy.audit_all_invocations: + if not success and not self._policy.audit_failures: + return + if function_name and not self._policy.audit_function_calls: + return + + record = { + "timestamp": datetime.now(timezone.utc).isoformat(), + "event_type": event_type, + "invoker_did": invoker_did, + "function_name": function_name, + "success": success, + "details": details or {}, + } + self._audit_log.append(record) + + async def invoke_function( + self, + function_name: str, + invoker_card: Optional[TrustedAgentCard] = None, + **kwargs: Any, + ) -> Any: + """Invoke a function with governance checks. + + Args: + function_name: Name of function to invoke + invoker_card: Optional invoker's agent card + **kwargs: Arguments for the function + + Returns: + Function result + + Raises: + PermissionError: If governance checks fail + """ + invoker_did = invoker_card.identity.did if invoker_card and invoker_card.identity else None + + # Verify trust if required + if self._policy.require_trust_verification and invoker_card: + result = self._handshake.verify_peer(invoker_card) + if not result.trusted: + self._record_violation( + ViolationType.TRUST_FAILURE, + invoker_did, + f"Trust verification failed: {result.reason}", + "Request blocked", + ) + self._audit("function_call", invoker_did, function_name, False, {"reason": result.reason}) + raise PermissionError(f"Trust verification failed: {result.reason}") + + # Check rate limits + if invoker_did and not self._check_rate_limit(invoker_did): + self._record_violation( + ViolationType.RATE_LIMIT, + invoker_did, + "Rate limit exceeded", + "Request blocked", + ) + self._audit("function_call", invoker_did, function_name, False, {"reason": "rate_limit"}) + raise PermissionError("Rate limit exceeded") + + # Check function is allowed + if not self._check_function_allowed(function_name): + self._record_violation( + ViolationType.CAPABILITY_DENIED, + invoker_did, + f"Function {function_name} not allowed", + "Request blocked", + ) + self._audit("function_call", invoker_did, function_name, False, {"reason": "function_denied"}) + raise PermissionError(f"Function {function_name} is not allowed by policy") + + # Check concurrent task limit + if self._active_tasks >= self._policy.max_concurrent_tasks: + self._record_violation( + ViolationType.RESOURCE_LIMIT, + invoker_did, + "Max concurrent tasks exceeded", + "Request blocked", + ) + raise PermissionError("Max concurrent tasks exceeded") + + # Execute function + self._active_tasks += 1 + try: + # Get function from kernel and invoke + func = self._kernel.get_function(function_name) + result = await func.invoke(self._kernel, **kwargs) + self._audit("function_call", invoker_did, function_name, True) + return result + except Exception as e: + self._audit("function_call", invoker_did, function_name, False, {"error": str(e)}) + raise + finally: + self._active_tasks -= 1 + + def get_violations(self) -> List[PolicyViolation]: + """Get all recorded violations.""" + return self._violations.copy() + + def get_audit_log(self) -> List[Dict[str, Any]]: + """Get the audit log.""" + return self._audit_log.copy() + + def clear_violations(self) -> None: + """Clear violation records.""" + self._violations.clear() + + def clear_audit_log(self) -> None: + """Clear audit log.""" + self._audit_log.clear() + + def get_governance_summary(self) -> Dict[str, Any]: + """Get governance metrics summary.""" + return { + "total_violations": len(self._violations), + "violations_by_type": { + vtype.value: sum(1 for v in self._violations if v.violation_type == vtype) + for vtype in ViolationType + }, + "audit_events": len(self._audit_log), + "active_tasks": self._active_tasks, + "rate_limited_agents": len(self._rate_limits), + } + + +class GovernedAgent: + """Wrapper for Semantic Kernel agents with governance. + + Provides governance controls around agent operations. + """ + + def __init__( + self, + agent: "Agent", + identity: CMVKIdentity, + policy: Optional[GovernancePolicy] = None, + ): + """Initialize governed agent. + + Args: + agent: The agent to govern + identity: Agent's cryptographic identity + policy: Governance policy + """ + self._agent = agent + self._identity = identity + self._policy = policy or GovernancePolicy() + self._handshake = TrustHandshake( + identity, + TrustPolicy( + min_trust_score=self._policy.min_trust_score, + block_unverified=self._policy.block_unverified_agents, + ), + ) + self._agent_card = self._create_agent_card() + self._violations: List[PolicyViolation] = [] + + def _create_agent_card(self) -> TrustedAgentCard: + """Create this agent's card.""" + card = TrustedAgentCard( + name=self._identity.agent_name, + description=f"Semantic Kernel agent: {self._agent.name}", + capabilities=self._identity.capabilities, + ) + card.sign(self._identity) + return card + + @property + def agent(self) -> "Agent": + """Get the underlying agent.""" + return self._agent + + @property + def identity(self) -> CMVKIdentity: + """Get agent identity.""" + return self._identity + + @property + def agent_card(self) -> TrustedAgentCard: + """Get agent's trust card.""" + return self._agent_card + + def verify_peer(self, peer_card: TrustedAgentCard) -> bool: + """Verify a peer agent. + + Args: + peer_card: Peer's agent card + + Returns: + True if peer is trusted + """ + result = self._handshake.verify_peer(peer_card) + return result.trusted + + def get_violations(self) -> List[PolicyViolation]: + """Get recorded violations.""" + return self._violations.copy() diff --git a/python/semantic_kernel/agents/agentmesh/identity.py b/python/semantic_kernel/agents/agentmesh/identity.py new file mode 100644 index 000000000000..567655e76900 --- /dev/null +++ b/python/semantic_kernel/agents/agentmesh/identity.py @@ -0,0 +1,146 @@ +"""Cryptographic identity for AgentMesh.""" + +from __future__ import annotations + +import base64 +import hashlib +import time +from dataclasses import dataclass, field +from datetime import datetime, timezone +from typing import Any, Dict, List, Optional + +try: + from cryptography.hazmat.primitives.asymmetric import ed25519 + from cryptography.exceptions import InvalidSignature + + CRYPTO_AVAILABLE = True +except ImportError: + CRYPTO_AVAILABLE = False + + +@dataclass +class CMVKSignature: + """A cryptographic signature.""" + + algorithm: str = "CMVK-Ed25519" + public_key: str = "" + signature: str = "" + timestamp: datetime = field(default_factory=lambda: datetime.now(timezone.utc)) + + def to_dict(self) -> Dict[str, Any]: + return { + "algorithm": self.algorithm, + "public_key": self.public_key, + "signature": self.signature, + "timestamp": self.timestamp.isoformat(), + } + + @classmethod + def from_dict(cls, data: Dict[str, Any]) -> "CMVKSignature": + timestamp_str = data.get("timestamp") + return cls( + algorithm=data.get("algorithm", "CMVK-Ed25519"), + public_key=data.get("public_key", ""), + signature=data.get("signature", ""), + timestamp=datetime.fromisoformat(timestamp_str) if timestamp_str else datetime.now(timezone.utc), + ) + + +@dataclass +class CMVKIdentity: + """Cryptographic identity using Ed25519.""" + + did: str + agent_name: str + public_key: str + private_key: Optional[str] = None + capabilities: List[str] = field(default_factory=list) + created_at: datetime = field(default_factory=lambda: datetime.now(timezone.utc)) + + @classmethod + def generate(cls, agent_name: str, capabilities: Optional[List[str]] = None) -> "CMVKIdentity": + """Generate new identity with Ed25519 keys.""" + seed = f"{agent_name}:{time.time_ns()}" + did_hash = hashlib.sha256(seed.encode()).hexdigest()[:32] + did = f"did:cmvk:{did_hash}" + + if CRYPTO_AVAILABLE: + private_key_obj = ed25519.Ed25519PrivateKey.generate() + public_key_obj = private_key_obj.public_key() + private_key_b64 = base64.b64encode(private_key_obj.private_bytes_raw()).decode("ascii") + public_key_b64 = base64.b64encode(public_key_obj.public_bytes_raw()).decode("ascii") + else: + key_seed = hashlib.sha256(f"{did}:key".encode()).hexdigest() + private_key_b64 = base64.b64encode(key_seed[:32].encode()).decode("ascii") + public_key_b64 = base64.b64encode(key_seed[32:].encode()).decode("ascii") + + return cls( + did=did, + agent_name=agent_name, + public_key=public_key_b64, + private_key=private_key_b64, + capabilities=capabilities or [], + ) + + def sign(self, data: str) -> CMVKSignature: + """Sign data with private key.""" + if not self.private_key: + raise ValueError("Cannot sign without private key") + + if CRYPTO_AVAILABLE: + private_key_bytes = base64.b64decode(self.private_key) + private_key_obj = ed25519.Ed25519PrivateKey.from_private_bytes(private_key_bytes) + signature_bytes = private_key_obj.sign(data.encode("utf-8")) + signature_b64 = base64.b64encode(signature_bytes).decode("ascii") + else: + sig_input = f"{data}:{self.private_key}" + signature_b64 = base64.b64encode(hashlib.sha256(sig_input.encode()).digest()).decode("ascii") + + return CMVKSignature(public_key=self.public_key, signature=signature_b64) + + def verify_signature(self, data: str, signature: CMVKSignature) -> bool: + """Verify a signature.""" + if signature.public_key != self.public_key: + return False + + if CRYPTO_AVAILABLE: + try: + public_key_bytes = base64.b64decode(self.public_key) + public_key_obj = ed25519.Ed25519PublicKey.from_public_bytes(public_key_bytes) + signature_bytes = base64.b64decode(signature.signature) + public_key_obj.verify(signature_bytes, data.encode("utf-8")) + return True + except (InvalidSignature, ValueError): + return False + return len(signature.signature) > 0 + + def to_dict(self) -> Dict[str, Any]: + return { + "did": self.did, + "agent_name": self.agent_name, + "public_key": self.public_key, + "capabilities": self.capabilities, + "created_at": self.created_at.isoformat(), + } + + @classmethod + def from_dict(cls, data: Dict[str, Any]) -> "CMVKIdentity": + created_str = data.get("created_at") + return cls( + did=data["did"], + agent_name=data["agent_name"], + public_key=data["public_key"], + capabilities=data.get("capabilities", []), + created_at=datetime.fromisoformat(created_str) if created_str else datetime.now(timezone.utc), + ) + + def public_identity(self) -> "CMVKIdentity": + """Return copy without private key.""" + return CMVKIdentity( + did=self.did, + agent_name=self.agent_name, + public_key=self.public_key, + private_key=None, + capabilities=self.capabilities.copy(), + created_at=self.created_at, + ) diff --git a/python/semantic_kernel/agents/agentmesh/trust.py b/python/semantic_kernel/agents/agentmesh/trust.py new file mode 100644 index 000000000000..f8937acc58c2 --- /dev/null +++ b/python/semantic_kernel/agents/agentmesh/trust.py @@ -0,0 +1,179 @@ +"""Trust verification for AgentMesh.""" + +from __future__ import annotations + +import json +from dataclasses import dataclass, field +from datetime import datetime, timedelta, timezone +from typing import Any, Dict, List, Optional + +from semantic_kernel.agents.agentmesh.identity import CMVKIdentity, CMVKSignature + + +@dataclass +class TrustPolicy: + """Trust verification policy.""" + + require_verification: bool = True + min_trust_score: float = 0.7 + allowed_capabilities: Optional[List[str]] = None + block_unverified: bool = True + cache_ttl_seconds: int = 900 + + +@dataclass +class TrustVerificationResult: + """Result of trust verification.""" + + trusted: bool + trust_score: float + reason: str + verified_capabilities: List[str] = field(default_factory=list) + warnings: List[str] = field(default_factory=list) + + +@dataclass +class TrustedAgentCard: + """Agent card for discovery and verification.""" + + name: str + description: str + capabilities: List[str] + identity: Optional[CMVKIdentity] = None + trust_score: float = 1.0 + card_signature: Optional[CMVKSignature] = None + metadata: Dict[str, Any] = field(default_factory=dict) + + def _get_signable_content(self) -> str: + content = { + "name": self.name, + "description": self.description, + "capabilities": sorted(self.capabilities), + "trust_score": self.trust_score, + "identity_did": self.identity.did if self.identity else None, + "identity_public_key": self.identity.public_key if self.identity else None, + } + return json.dumps(content, sort_keys=True, separators=(",", ":")) + + def sign(self, identity: CMVKIdentity) -> None: + """Sign the card.""" + self.identity = identity.public_identity() + signable = self._get_signable_content() + self.card_signature = identity.sign(signable) + + def verify_signature(self) -> bool: + """Verify card signature.""" + if not self.identity or not self.card_signature: + return False + signable = self._get_signable_content() + return self.identity.verify_signature(signable, self.card_signature) + + def to_dict(self) -> Dict[str, Any]: + result = { + "name": self.name, + "description": self.description, + "capabilities": self.capabilities, + "trust_score": self.trust_score, + "metadata": self.metadata, + } + if self.identity: + result["identity"] = self.identity.to_dict() + if self.card_signature: + result["card_signature"] = self.card_signature.to_dict() + return result + + @classmethod + def from_dict(cls, data: Dict[str, Any]) -> "TrustedAgentCard": + identity = CMVKIdentity.from_dict(data["identity"]) if "identity" in data else None + card_signature = CMVKSignature.from_dict(data["card_signature"]) if "card_signature" in data else None + return cls( + name=data["name"], + description=data.get("description", ""), + capabilities=data.get("capabilities", []), + identity=identity, + trust_score=data.get("trust_score", 1.0), + card_signature=card_signature, + metadata=data.get("metadata", {}), + ) + + +class TrustHandshake: + """Handles trust verification between agents.""" + + def __init__(self, my_identity: CMVKIdentity, policy: Optional[TrustPolicy] = None): + self.my_identity = my_identity + self.policy = policy or TrustPolicy() + self._verified_peers: Dict[str, tuple[TrustVerificationResult, datetime]] = {} + self._cache_ttl = timedelta(seconds=self.policy.cache_ttl_seconds) + + def _get_cached_result(self, did: str) -> Optional[TrustVerificationResult]: + if did in self._verified_peers: + result, timestamp = self._verified_peers[did] + if datetime.now(timezone.utc) - timestamp < self._cache_ttl: + return result + del self._verified_peers[did] + return None + + def _cache_result(self, did: str, result: TrustVerificationResult) -> None: + self._verified_peers[did] = (result, datetime.now(timezone.utc)) + + def verify_peer( + self, + peer_card: TrustedAgentCard, + required_capabilities: Optional[List[str]] = None, + min_trust_score: Optional[float] = None, + ) -> TrustVerificationResult: + """Verify a peer agent.""" + warnings: List[str] = [] + min_score = min_trust_score or self.policy.min_trust_score + + if peer_card.identity: + cached = self._get_cached_result(peer_card.identity.did) + if cached: + return cached + + if not peer_card.identity: + return TrustVerificationResult( + trusted=False, trust_score=0.0, reason="No identity provided" + ) + + if not peer_card.identity.did.startswith("did:cmvk:"): + return TrustVerificationResult( + trusted=False, trust_score=0.0, reason="Invalid DID format" + ) + + if not peer_card.verify_signature(): + return TrustVerificationResult( + trusted=False, trust_score=0.0, reason="Signature verification failed" + ) + + if peer_card.trust_score < min_score: + return TrustVerificationResult( + trusted=False, + trust_score=peer_card.trust_score, + reason=f"Trust score {peer_card.trust_score} below minimum {min_score}", + ) + + verified_caps = peer_card.capabilities.copy() + if required_capabilities: + missing = set(required_capabilities) - set(peer_card.capabilities) + if missing: + return TrustVerificationResult( + trusted=False, + trust_score=peer_card.trust_score, + reason=f"Missing capabilities: {missing}", + verified_capabilities=verified_caps, + ) + + result = TrustVerificationResult( + trusted=True, + trust_score=peer_card.trust_score, + reason="Verification successful", + verified_capabilities=verified_caps, + warnings=warnings, + ) + self._cache_result(peer_card.identity.did, result) + return result + + def clear_cache(self) -> None: + self._verified_peers.clear()