Skip to content
Merged
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ jobs:
python-version: "3.12"

- name: Install dependencies
run: python -m pip install --upgrade pip setuptools && pip install -e ".[dev]" agent-compliance
run: python -m pip install --upgrade pip setuptools && pip install -e ".[dev]" "agent-governance-toolkit>=4.1"

- name: Generate evidence file
run: python scripts/gen_agt_evidence.py
Expand Down
41 changes: 41 additions & 0 deletions experiments/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
# cMCP Experiments

Reproducible experiments backing technical claims in the cMCP papers.

Each experiment imports directly from `cmcp_runtime`. Run from the repo root after `pip install -e .`.

## Experiments

| Dir | Claim | Key result |
|-----|-------|-----------|
| [claim1-policy-hash-binding](claim1-policy-hash-binding/) | Claim 1 — TEE-measured policy enforcement | Deterministic hash, 51% avalanche on 1-char change, PolicyHashMismatch, TRACE sig invalidated |
| [claim2-session-vs-call-policy](claim2-session-vs-call-policy/) | Claim 2 — Session sensitivity state | Session policy catches 2/2 PHI cross-boundary violations; per-call catches 0/2 |
| [claim2-false-positive-rate](claim2-false-positive-rate/) | Claim 2 — Session sensitivity state (cost) | Overall FPR 69%; Billing/Batch 100%; Clinical Decision Support 0% |
| [claim3-rug-pull-detection](claim3-rug-pull-detection/) | Claim 3 — Tool catalog drift detection | 48% bit change on one-sentence description tamper; CatalogHashMismatch fail-closed |
| [claim4-trace-claim-nonce](claim4-trace-claim-nonce/) | Claim 4 — TRACE Claim nonce binding | 6 properties: nonce determinism, session/instance binding, replay prevention, sig tamper, selective disclosure |
| [claim5-temporal-adjacency](claim5-temporal-adjacency/) | Claim 5 — Temporal adjacency provenance | Zero false negatives by construction; provenance disclaimer in every summary; denied calls in graph |
| [claim6-cross-org-attestation](claim6-cross-org-attestation/) | Claim 6 — Cross-org attestation chains | Dual-TEE protocol: independent keys, session linkage, independent verify, binary swap detection |

## Running

```bash
pip install -e .
python experiments/claim1-policy-hash-binding/run.py
python experiments/claim2-session-vs-call-policy/run.py
python experiments/claim2-false-positive-rate/run.py
python experiments/claim3-rug-pull-detection/run.py
python experiments/claim4-trace-claim-nonce/run.py
python experiments/claim5-temporal-adjacency/run.py
python experiments/claim6-cross-org-attestation/run.py
```

All experiments run in software-only mode. No hardware TEE is required. TRACE Claims produced in software-only mode carry `attestation_assurance: none` and must not be used for compliance purposes.

## CI tests

| File | Claims | Tests |
|------|--------|-------|
| `tests/unit/test_claim1_hash_binding.py` | Claim 1 | 6 |
| `tests/unit/test_claim2_session_gap.py` | Claim 2 | 6 |
| `tests/unit/test_claim5_temporal_adjacency.py` | Claim 5 | 9 |
| `tests/unit/test_claim6_cross_org_attestation.py` | Claim 6 | 9 |
40 changes: 40 additions & 0 deletions experiments/claim5-temporal-adjacency/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
# Claim 5: Temporal Adjacency as a Formally Bounded Provenance Approximation

**Claim:** Temporal Adjacency as a Formally Bounded Approximation of Data Provenance for AI Agent Compliance
**Paper:** `agentrust-io/papers/temporal-adjacency.md`

---

## What this measures

At the MCP transport boundary, a gateway cannot observe whether an LLM agent included a specific tool response in its context window for the next call. The temporal adjacency model records an edge from call A to call B whenever B's sequence number is greater than A's and A contributed to session sensitivity. This is conservative: it may record edges where the agent did not actually use A's data (false positives), but it never misses an edge where the agent did (no false negatives).

| Property | What it proves |
|---|---|
| P1 — Sequential recording | Calls recorded with monotonic sequence numbers |
| P2 — Cross-boundary detection | Transitions from high-sensitivity domains recorded in graph |
| P3 — Provenance disclaimer | `edges_represent` field explicitly qualifies adjacency vs. provenance |
| P4 — No false negatives | Any PHI-relevant subsequent call has seq > PHI call seq; edge implicit |
| P5 — Concurrent calls | Simultaneous calls both adjacent to prior PHI call |
| P6 — Denied calls in graph | Agent's request is evidence of awareness, regardless of response delivery |

---

## Running

```bash
pip install -e .
python experiments/claim5-temporal-adjacency/run.py
```

---

## Relationship to Claim 2 FPR

The Claim 2 false positive rate experiment (`experiments/claim2-false-positive-rate/`) measures the operational cost of the monotonic model — what fraction of blocked external calls are unnecessary. That experiment and this one are two sides of the same coin: this experiment proves no false negatives; the FPR experiment measures the false positive rate empirically.

---

## High-sensitivity domains (implementation note)

The `SessionCallLog` records cross-boundary events when a call follows a call in a high-sensitivity compliance domain. The current set is `{"phi", "pii", "pci", "restricted"}`. The catalog compliance_domain field should map tool destinations to these labels for cross-boundary detection to trigger. The session sensitivity model uses a separate `SENSITIVITY_ORDER` dict with `hipaa_phi`, `mnpi`, etc. These two taxonomies are intentionally separate: the call graph tracks destination-class transitions, while session state tracks data-class sensitivity.
231 changes: 231 additions & 0 deletions experiments/claim5-temporal-adjacency/run.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,231 @@
"""
Claim 5: Temporal adjacency as a formally bounded approximation of data provenance.

The gateway observes the MCP transport boundary. It cannot see the agent's context
window. For compliance purposes it needs to record *which calls could have influenced
which other calls*. The temporal adjacency model answers this conservatively:
any call B whose request time follows the response time of a sensitive call A has a
recorded edge A->B.

The formal guarantee: no false negatives. If the agent did use A's data when
formulating B, the model will have recorded an edge from A to B. It may also
record edges where the agent did not use A's data (false positives -- see the
Claim 2 FPR experiment for the measured rate).

This experiment verifies:

P1 Call graph records calls in arrival order with monotonic sequence numbers.
P2 Cross-boundary events are recorded when a call follows a high-sensitivity
domain call and transitions to a different compliance domain.
P3 The provenance disclaimer is embedded in every call graph summary.
P4 Conservatism guarantee: every call after a sensitive call has a higher
sequence number, guaranteeing an implicit edge -- no false negatives by
construction.
P5 Concurrent call ordering: calls with the same request timestamp are recorded
in the order they were logged. No edge is missed; both are adjacent to any
prior sensitive call.
P6 Denied calls are still recorded in the graph -- the agent's *request* is
evidence of awareness, regardless of whether the response was delivered.

Running:
pip install -e .
python experiments/claim5-temporal-adjacency/run.py
"""
from __future__ import annotations

import sys
from datetime import UTC, datetime, timedelta

from cmcp_runtime.session.call_log import SessionCallLog, _HIGH_SENSITIVITY_DOMAINS # noqa: PLC2701


def _result(label: str, value: str) -> None:
print(f" {label}: {value}")


def _fake_entry(tool_name: str, compliance_domain: str, sensitivity_tags: list[str], allowed: bool = True):
"""Return a (tool_name, compliance_domain, sensitivity_tags, allowed) tuple for record_call."""
return tool_name, compliance_domain, sensitivity_tags, allowed


def main() -> int:
print()
print("Claim 5 | Temporal adjacency as a formally bounded provenance approximation")
print("=" * 74)

# --- P1: Sequential recording ---
print()
print("P1 Calls recorded in arrival order with monotonic sequence numbers")
log = SessionCallLog("session-p1")

class _FakeCatalogEntry:
def __init__(self, name, domain):
self.tool_name = name
self.compliance_domain = domain
self.server = type("s", (), {"url": f"https://{domain}.internal/mcp"})()

calls_p1 = [
("ehr.get_patient", "phi", ["hipaa_phi"]),
("analytics.run_query", "internal", []),
("slack.post_message", "external", []),
]
for tool, domain, tags in calls_p1:
log.record_call("c-" + tool, _FakeCatalogEntry(tool, domain), "allow", response_sensitivity_tags=tags)

for entry in log.entries:
_result(f"seq={entry.sequence_number}", f"{entry.tool_name} ({entry.compliance_domain})")

seqs = [e.sequence_number for e in log.entries]
if seqs != sorted(seqs) or len(set(seqs)) != len(seqs):
print(" FAIL: sequence numbers not strictly monotonic")
return 1
print(" PASS: calls recorded in order with monotonic sequence numbers")

# --- P2: Cross-boundary event detection ---
print()
print("P2 Cross-boundary events: transitions FROM high-sensitivity domains")
log2 = SessionCallLog("session-p2")
calls_p2 = [
("ehr.get_patient", "phi", ["hipaa_phi"]),
("billing.submit_claim", "external", []),
("analytics.run_query", "internal", []),
("ehr.get_labs", "phi", ["hipaa_phi"]),
("slack.notify", "external", []),
]
for tool, domain, tags in calls_p2:
log2.record_call(tool, _FakeCatalogEntry(tool, domain), "allow", response_sensitivity_tags=tags)

summary = log2.get_call_graph_summary()
_result("compliance_domains_touched", str(sorted(summary["compliance_domains_touched"])))
_result("cross_boundary_events count", str(len(summary["cross_boundary_events"])))
for evt in summary["cross_boundary_events"]:
_result(
f" event seq={evt['sequence_number']}",
f"{evt['from_domain']} -> {evt['to_domain']} via {evt['tool_name']}",
)
if len(summary["cross_boundary_events"]) < 2:
print(" FAIL: expected at least 2 cross-boundary events")
return 1
print(" PASS: cross-boundary transitions from phi domain recorded")
_result("high_sensitivity_domains", str(sorted(_HIGH_SENSITIVITY_DOMAINS)))

# --- P3: Provenance disclaimer embedded in every summary ---
print()
print("P3 Provenance disclaimer in call graph summary")
_result("edges_represent", repr(summary["edges_represent"]))
if "temporal adjacency" not in summary["edges_represent"].lower():
print(" FAIL: edges_represent missing temporal adjacency disclaimer")
return 1
if "not data provenance" not in summary["edges_represent"].lower():
print(" FAIL: edges_represent missing 'not data provenance' qualifier")
return 1
print(" PASS: provenance disclaimer present in every call graph summary")

# --- P4: Conservatism guarantee (no false negatives) ---
print()
print("P4 Conservatism guarantee -- no false negatives by construction")
log4 = SessionCallLog("session-p4")
SCENARIO = [
# (tool, domain, tags, phi_in_context_ground_truth)
("ehr.get_patient", "phi", ["hipaa_phi"], False), # PHI not yet loaded
("analytics.run_query", "internal", ["confidential"], True), # agent uses PHI
("billing.submit_claim", "external", [], False), # agent NOT using PHI
("ehr.get_labs", "phi", ["hipaa_phi"], True), # more PHI
("slack.send_notification", "external", [], False), # agent NOT using PHI
]
for tool, domain, tags, _ in SCENARIO:
log4.record_call(tool, _FakeCatalogEntry(tool, domain), "allow", response_sensitivity_tags=tags)

entries = log4.entries
phi_calls = [e for e in entries if "phi" in e.compliance_domain]
false_negatives = 0
for phi_call in phi_calls:
subsequent = [e for e in entries if e.sequence_number > phi_call.sequence_number]
_, _, _, phi_in_ctx = SCENARIO[phi_call.sequence_number]
for subsequent_call in subsequent:
idx = subsequent_call.sequence_number
_, _, _, phi_in_ctx_sub = SCENARIO[idx]
# Would be a false negative if agent used PHI in this call
# but no edge exists (impossible by construction -- sequence number ordering)
edge_exists = subsequent_call.sequence_number > phi_call.sequence_number
if phi_in_ctx_sub and not edge_exists:
false_negatives += 1

_result("PHI calls", str(len(phi_calls)))
_result("Total subsequent calls after any PHI call (potential edges)", str(
sum(len([e for e in entries if e.sequence_number > p.sequence_number]) for p in phi_calls)
))
_result("False negatives (PHI-relevant calls with missing edge)", str(false_negatives))
print(" Temporal adjacency guarantees: any call B after PHI call A has seq(B) > seq(A).")
print(" The model always records an implicit edge A->B. False negatives = 0 by construction.")
if false_negatives > 0:
print(" FAIL: false negatives detected")
return 1
print(" PASS: zero false negatives -- conservatism guarantee confirmed")

# --- P5: Concurrent calls ---
print()
print("P5 Concurrent call ordering -- simultaneous requests both adjacent to prior PHI call")
log5 = SessionCallLog("session-p5")
log5.record_call("phi-call", _FakeCatalogEntry("ehr.get_patient", "phi"), "allow",
response_sensitivity_tags=["hipaa_phi"])
# Two calls logged "simultaneously" (both after the PHI call)
log5.record_call("concurrent-A", _FakeCatalogEntry("billing.submit_claim", "external"), "allow",
response_sensitivity_tags=[])
log5.record_call("concurrent-B", _FakeCatalogEntry("slack.notify", "external"), "allow",
response_sensitivity_tags=[])

phi_seq = log5.entries[0].sequence_number
concurrent_seqs = [e.sequence_number for e in log5.entries[1:]]
all_after_phi = all(s > phi_seq for s in concurrent_seqs)
_result("PHI call sequence", str(phi_seq))
_result("Concurrent A sequence", str(log5.entries[1].sequence_number))
_result("Concurrent B sequence", str(log5.entries[2].sequence_number))
_result("Both after PHI call?", str(all_after_phi))
if not all_after_phi:
print(" FAIL: concurrent calls not recorded after PHI call")
return 1
print(" PASS: concurrent calls both logged after PHI -- adjacency preserved for all")

# --- P6: Denied calls still in graph ---
print()
print("P6 Denied calls recorded in graph -- agent awareness is the trigger, not response delivery")
log6 = SessionCallLog("session-p6")
log6.record_call("phi-allowed", _FakeCatalogEntry("ehr.get_patient", "phi"), "allow",
response_sensitivity_tags=["hipaa_phi"])
log6.record_call("external-denied", _FakeCatalogEntry("slack.post_message", "external"), "deny",
response_sensitivity_tags=[]) # blocked by session policy, no response

entries6 = log6.entries
denied_entry = next((e for e in entries6 if e.policy_decision == "deny"), None)
_result("Entries recorded", str(len(entries6)))
_result("Denied entry in graph?", "yes" if denied_entry else "no")
_result("Denied call sequence number", str(denied_entry.sequence_number if denied_entry else "N/A"))
if denied_entry is None:
print(" FAIL: denied call not recorded in call graph")
return 1
if denied_entry.sequence_number <= log6.entries[0].sequence_number:
print(" FAIL: denied call has wrong sequence number")
return 1
print(" PASS: denied call recorded -- agent's request is evidence of awareness")

# --- Summary ---
print()
print("Summary:")
print(" P1: Monotonic sequence numbers PASS")
print(" P2: Cross-boundary event detection PASS")
print(" P3: Provenance disclaimer embedded PASS")
print(" P4: No false negatives by construction PASS")
print(" P5: Concurrent calls adjacent to PHI PASS")
print(" P6: Denied calls in graph PASS")
print()
print("Formal guarantee: the temporal adjacency model produces zero false negatives")
print("for the property 'if the agent used A's data when formulating B, the model")
print("records a relationship between A and B'. False positives are accepted as the")
print("price of conservatism. See experiments/claim2-false-positive-rate/ for FPR.")
print()
return 0


if __name__ == "__main__":
sys.exit(main())
57 changes: 57 additions & 0 deletions experiments/claim6-cross-org-attestation/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
# Claim 6: Cross-Organizational Attestation Chains for B2B AI Tool Access

**Claim:** Cross-Organizational Attestation Chains for B2B AI Tool Access
**Paper:** `agentrust-io/papers/cross-org-attestation.md`
**Status:** Phase 2 concept. Phase 2 server TEE not yet deployed. This experiment is a software simulation of the dual-attestation protocol.

---

## What this measures

In B2B AI tool access, enterprise (party A) uses a Phase 1 cMCP gateway and SaaS vendor (party B) uses a Phase 2 cMCP server. Each operates an independent TEE with a separate keypair. A third-party verifier can confirm both sides independently, without trusting either operator's infrastructure.

| Property | What it proves |
|---|---|
| P1 — Independent keys | Gateway and server have different TEE keypairs |
| P2 — Session linkage | Both claims carry the same session_id |
| P3 — Phase 1 nonce | SHA-256(gateway_key ∥ session_id) binds Phase 1 to session |
| P4 — Phase 2 nonce | SHA-256(server_key ∥ session_id) binds Phase 2 to session |
| P5 — Independent verify | Each claim verifiable against its own public key |
| P6 — Tamper independence | Phase 1 tamper invalidates only Phase 1; Phase 2 unaffected |
| P7 — Binary swap detection | Different server binary → different measurement → verifier rejects |

---

## Running

```bash
pip install -e .
python experiments/claim6-cross-org-attestation/run.py
```

---

## Cross-org verification protocol

```
Verifier checklist for a paired Phase 1 + Phase 2 TRACE Claim:
1. Verify Phase 1 Ed25519 signature against embedded gateway public key
2. Compute expected Phase 1 nonce = SHA-256(gateway_key || session_id)
3. Confirm Phase 1 attestation report contains the expected nonce (hardware check in production)
4. Verify Phase 2 Ed25519 signature against embedded server public key
5. Compute expected Phase 2 nonce = SHA-256(server_key || session_id)
6. Confirm Phase 2 attestation report contains the expected nonce (hardware check in production)
7. Confirm Phase 1 session_id == Phase 2 session_id (linkage)
8. Confirm Phase 2 server_binary_measurement == pre-approved measurement
9. Confirm Phase 2 tool_catalog_hash == independently-reviewed catalog hash
```

Steps 3 and 6 require hardware in production. In software simulation (this experiment), they are demonstrated as mathematical checks.

---

## What Phase 2 attests (per server TEE)

- **Server binary measurement**: SHA-256 of the tool server binary, measured into the TEE PCR before any code runs. A binary update changes the measurement; verifiers holding the prior approved measurement detect it.
- **Tool catalog hash**: SHA-256 of the server's approved tool definitions. Prevents server-side rug-pulls independent of Phase 1 catalog drift detection.
- **Egress policy hash**: SHA-256 of the server's egress policy. Prevents the server from calling unapproved upstream APIs with enterprise data.
Loading