Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions agentmint/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
from .types import DelegationStatus, DelegationResult, EnforceMode
from .decorator import (
AuthorizationError,
notarise,
require_receipt,
set_receipt,
get_receipt,
Expand Down Expand Up @@ -58,6 +59,7 @@
"ReplayError",
"DeniedError",
"AuthorizationError",
"notarise",
# Decorator
"require_receipt",
"set_receipt",
Expand Down
62 changes: 62 additions & 0 deletions agentmint/chain.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
"""Receipt chain utilities."""

from __future__ import annotations

from dataclasses import dataclass
from typing import Optional, Sequence

from .patterns import matches_pattern


def intersect_scopes(
parent_scope: Sequence[str],
requested: Sequence[str],
) -> tuple[str, ...]:
"""Return the effective delegated scope."""

result = []
for child in requested:
for parent in parent_scope:
if child == parent:
if child not in result:
result.append(child)
elif matches_pattern(child, parent):
if child not in result:
result.append(child)
elif matches_pattern(parent, child):
if parent not in result:
result.append(parent)
return tuple(result)


@dataclass(frozen=True)
class ChainVerification:
"""Result of verifying an ordered receipt chain."""

valid: bool
length: int
root_hash: str
break_at_index: Optional[int] = None
reason: str = ""


def verify_chain(receipts: Sequence[object]) -> ChainVerification:
"""Verify chain linkage using previous receipt hashes."""

if not receipts:
return ChainVerification(valid=True, length=0, root_hash="")

previous_hash = None
for index, receipt in enumerate(receipts):
current_previous = getattr(receipt, "previous_receipt_hash", None)
if current_previous != previous_hash:
return ChainVerification(
valid=False,
length=len(receipts),
root_hash="",
break_at_index=index,
reason="chain break at index %d" % index,
)
previous_hash = getattr(receipt, "canonical_hash")()

return ChainVerification(valid=True, length=len(receipts), root_hash=previous_hash or "")
43 changes: 42 additions & 1 deletion agentmint/decorator.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
"""Decorator for protecting functions with receipts."""
"""Decorator helpers for AgentMint authorization and notarisation."""

from __future__ import annotations
from contextvars import ContextVar
Expand Down Expand Up @@ -82,3 +82,44 @@ def wrapper(*args: P.args, **kwargs: P.kwargs) -> T:
return wrapper

return decorator


def notarise(
notary,
action: Optional[str] = None,
plan=None,
agent: Optional[str] = None,
evidence=None,
enable_timestamp: bool = True,
) -> Callable[[Callable[P, T]], Callable[P, T]]:
"""Decorator that records a receipt after a successful function call."""

def decorator(func: Callable[P, T]) -> Callable[P, T]:
@wraps(func)
def wrapper(*args: P.args, **kwargs: P.kwargs) -> T:
result = func(*args, **kwargs)
if callable(evidence):
receipt_evidence = evidence(*args, **kwargs, result=result)
elif evidence is None:
receipt_evidence = {
"function": func.__name__,
"args": list(args),
"kwargs": kwargs,
}
else:
receipt_evidence = dict(evidence)

receipt_action = action or func.__name__
wrapper.last_receipt = notary.notarise(
action=receipt_action,
agent=agent,
plan=plan,
evidence=receipt_evidence,
enable_timestamp=enable_timestamp,
)
return result

wrapper.last_receipt = None # type: ignore[attr-defined]
return wrapper

return decorator
Loading
Loading