Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions capiscio_mcp/_proto/capiscio/v1/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,8 @@
mcp_pb2.TOOL_ISSUER_UNTRUSTED = mcp_pb2.MCP_DENY_REASON_ISSUER_UNTRUSTED
if not hasattr(mcp_pb2, 'TOOL_POLICY_DENIED'):
mcp_pb2.TOOL_POLICY_DENIED = mcp_pb2.MCP_DENY_REASON_POLICY_DENIED
if not hasattr(mcp_pb2, 'TOOL_SCOPE_INSUFFICIENT'):
mcp_pb2.TOOL_SCOPE_INSUFFICIENT = mcp_pb2.MCP_DENY_REASON_SCOPE_INSUFFICIENT

# ServerState
if not hasattr(mcp_pb2, 'SERVER_STATE_UNSPECIFIED'):
Expand Down
96 changes: 50 additions & 46 deletions capiscio_mcp/_proto/capiscio/v1/mcp_pb2.py

Large diffs are not rendered by default.

118 changes: 50 additions & 68 deletions capiscio_mcp/_proto/gen/capiscio/v1/mcp_pb2.py

Large diffs are not rendered by default.

6 changes: 6 additions & 0 deletions capiscio_mcp/errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,12 +77,18 @@ def __init__(
evidence_id: str = "",
agent_did: Optional[str] = None,
trust_level: Optional[int] = None,
error_code: Optional[str] = None,
requested_capability: Optional[str] = None,
presented_capability: Optional[str] = None,
Comment on lines +80 to +82
):
self.reason = reason
self.detail = detail
self.evidence_id = evidence_id
self.agent_did = agent_did
self.trust_level = trust_level
self.error_code = error_code
self.requested_capability = requested_capability
self.presented_capability = presented_capability

message = f"{reason.value}: {detail}"
if evidence_id:
Expand Down
43 changes: 43 additions & 0 deletions capiscio_mcp/guard.py
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,11 @@ class GuardResult:
# Evidence
evidence_id: str = ""
evidence_json: str = ""

# RFC-008: Structured rejection metadata
error_code: Optional[str] = None
requested_capability: Optional[str] = None
presented_capability: Optional[str] = None


def compute_params_hash(params: dict[str, Any]) -> str:
Expand Down Expand Up @@ -323,6 +328,7 @@ async def evaluate_tool_access(
mcp_pb2.TOOL_NOT_ALLOWED: DenyReason.TOOL_NOT_ALLOWED,
mcp_pb2.TOOL_ISSUER_UNTRUSTED: DenyReason.ISSUER_UNTRUSTED,
mcp_pb2.TOOL_POLICY_DENIED: DenyReason.POLICY_DENIED,
mcp_pb2.TOOL_SCOPE_INSUFFICIENT: DenyReason.SCOPE_INSUFFICIENT,
}
Comment on lines 328 to 332
deny_reason = deny_reason_map.get(response.deny_reason, DenyReason.INTERNAL_ERROR)

Expand All @@ -343,6 +349,9 @@ async def evaluate_tool_access(
trust_level=response.trust_level,
evidence_id=response.evidence_id,
evidence_json=response.evidence_json,
error_code=response.error_code or None,
requested_capability=response.requested_capability or None,
presented_capability=response.presented_capability or None,
Comment on lines 350 to +354
)


Expand Down Expand Up @@ -450,12 +459,29 @@ async def wrapper(*args: P.args, **kwargs: P.kwargs) -> R:

# Check decision
if result.decision == Decision.DENY:
logger.warning(
"capiscio.policy_enforced: tool=%s decision=DENY "
"agent=%s trust_level=%s reason=%s error_code=%s "
"requested_capability=%s presented_capability=%s "
"evidence_id=%s",
effective_tool_name,
result.agent_did,
result.trust_level,
result.deny_reason,
result.error_code,
result.requested_capability,
result.presented_capability,
result.evidence_id,
)
raise GuardError(
reason=result.deny_reason or DenyReason.INTERNAL_ERROR,
detail=result.deny_detail or "Access denied",
evidence_id=result.evidence_id,
agent_did=result.agent_did,
trust_level=result.trust_level,
error_code=result.error_code,
requested_capability=result.requested_capability,
presented_capability=result.presented_capability,
)

# Log successful access
Expand Down Expand Up @@ -544,12 +570,29 @@ async def run_eval():

# Check decision
if result.decision == Decision.DENY:
logger.warning(
"capiscio.policy_enforced: tool=%s decision=DENY "
"agent=%s trust_level=%s reason=%s error_code=%s "
"requested_capability=%s presented_capability=%s "
"evidence_id=%s",
effective_tool_name,
result.agent_did,
result.trust_level,
result.deny_reason,
result.error_code,
result.requested_capability,
result.presented_capability,
result.evidence_id,
)
raise GuardError(
reason=result.deny_reason or DenyReason.INTERNAL_ERROR,
detail=result.deny_detail or "Access denied",
evidence_id=result.evidence_id,
agent_did=result.agent_did,
trust_level=result.trust_level,
error_code=result.error_code,
requested_capability=result.requested_capability,
presented_capability=result.presented_capability,
)

# Execute tool
Expand Down
1 change: 1 addition & 0 deletions capiscio_mcp/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ class DenyReason(str, Enum):
# Policy issues
TOOL_NOT_ALLOWED = "tool_not_allowed" # Tool not in allowed list
POLICY_DENIED = "policy_denied" # Policy evaluation failed
SCOPE_INSUFFICIENT = "scope_insufficient" # RFC-008: Capability class scope insufficient

# Other
INTERNAL_ERROR = "internal_error" # Unexpected error
Expand Down
25 changes: 25 additions & 0 deletions tests/test_errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,31 @@ def test_repr(self):
assert "GuardError" in repr_str
assert "BADGE_INVALID" in repr_str

def test_scope_insufficient_fields(self):
error = GuardError(
reason=DenyReason.SCOPE_INSUFFICIENT,
detail="Capability class mismatch",
evidence_id="ev_scope_1",
agent_did="did:web:example.com:agents:test",
trust_level=2,
error_code="SCOPE_INSUFFICIENT",
requested_capability="storage",
presented_capability="compute",
)
assert error.reason == DenyReason.SCOPE_INSUFFICIENT
assert error.error_code == "SCOPE_INSUFFICIENT"
assert error.requested_capability == "storage"
assert error.presented_capability == "compute"

def test_scope_insufficient_defaults_none(self):
error = GuardError(
reason=DenyReason.POLICY_DENIED,
detail="Denied",
)
assert error.error_code is None
assert error.requested_capability is None
assert error.presented_capability is None


class TestGuardConfigError:
"""Tests for GuardConfigError."""
Expand Down
69 changes: 69 additions & 0 deletions tests/test_guard.py
Original file line number Diff line number Diff line change
Expand Up @@ -452,3 +452,72 @@ async def documented_tool(arg: str) -> str:

assert documented_tool.__name__ == "documented_tool"
assert documented_tool.__doc__ == "This is the docstring."


class TestScopeInsufficientDenyPath:
"""Tests for SCOPE_INSUFFICIENT deny reason (RFC-008 §9.3)."""

@pytest.mark.asyncio
async def test_guard_denies_scope_insufficient(self, mock_core_client, sample_badge_jws):
"""Tool execution denied when capability class scope is insufficient."""
mock_response = MagicMock()
mock_response.decision = 2 # DENY
mock_response.deny_reason = 9 # TOOL_SCOPE_INSUFFICIENT
mock_response.deny_detail = "Capability class mismatch"
mock_response.agent_did = "did:web:example.com:agents:test"
mock_response.badge_jti = "badge-scope-1"
mock_response.auth_level = 3 # BADGE
mock_response.trust_level = 2
mock_response.evidence_id = "ev-scope-1"
mock_response.evidence_json = "{}"
mock_response.error_code = "SCOPE_INSUFFICIENT"
mock_response.requested_capability = "storage"
mock_response.presented_capability = "compute"

mock_core_client.stub.EvaluateToolAccess = AsyncMock(return_value=mock_response)

with patch("capiscio_mcp._core.client.CoreClient.get_instance", return_value=mock_core_client):
@guard()
async def write_file(path: str) -> str:
return f"Written to {path}"

with pytest.raises(GuardError) as exc_info:
await write_file(path="/tmp/test.txt")

err = exc_info.value
assert err.reason == DenyReason.SCOPE_INSUFFICIENT
assert err.error_code == "SCOPE_INSUFFICIENT"
assert err.requested_capability == "storage"
assert err.presented_capability == "compute"

@pytest.mark.asyncio
async def test_evaluate_propagates_scope_fields(self, mock_core_client):
"""GuardResult carries error_code/requested/presented from gRPC response."""
mock_response = MagicMock()
mock_response.decision = 2 # DENY
mock_response.deny_reason = 9 # TOOL_SCOPE_INSUFFICIENT
mock_response.deny_detail = "Scope mismatch"
mock_response.agent_did = "did:web:example.com:agents:test"
mock_response.badge_jti = "badge-scope-2"
mock_response.auth_level = 3
mock_response.trust_level = 2
mock_response.evidence_id = "ev-scope-2"
mock_response.evidence_json = "{}"
mock_response.error_code = "SCOPE_INSUFFICIENT"
mock_response.requested_capability = "network"
mock_response.presented_capability = "storage"

mock_core_client.stub.EvaluateToolAccess = AsyncMock(return_value=mock_response)

from capiscio_mcp.guard import evaluate_tool_access

with patch("capiscio_mcp._core.client.CoreClient.get_instance", return_value=mock_core_client):
result = await evaluate_tool_access(
tool_name="my_tool",
params={},
)

assert result.deny_reason == DenyReason.SCOPE_INSUFFICIENT
assert result.error_code == "SCOPE_INSUFFICIENT"
assert result.requested_capability == "network"
assert result.presented_capability == "storage"
Loading