Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
143 changes: 76 additions & 67 deletions tests/cli/test_e2e.py
Original file line number Diff line number Diff line change
@@ -1,103 +1,112 @@
from __future__ import annotations

import os
import subprocess
import textwrap
import sys
from pathlib import Path

from agentmint import Notary

import pytest
ROOT = Path(__file__).resolve().parents[2]


def _write_agent(tmp_path):
(tmp_path / "agent.py").write_text(
textwrap.dedent(
"""
from agentmint import Notary, notarise
notary = Notary()
def run_cli(*args: str, cwd: Path) -> subprocess.CompletedProcess:
env = os.environ.copy()
env["PYTHONPATH"] = str(ROOT) + os.pathsep + env.get("PYTHONPATH", "")
return subprocess.run(
[sys.executable, "-c", "from agentmint.cli.app import app; app()", *args],
cwd=cwd,
env=env,
text=True,
capture_output=True,
check=False,
)

@notarise(notary, action="test:hello")
def greet(name):
return {"greeting": f"hello {name}"}

if __name__ == "__main__":
greet("world")
"""
)
def write_receipt(tmp_path: Path) -> Path:
notary = Notary(key=tmp_path / ".agentmint" / "keys")
plan = notary.create_plan(user="cli@test", action="cli", scope=["*"], ttl_seconds=60)
receipt = notary.notarise(
action="files:read",
agent="cli-agent",
plan=plan,
evidence={"path": "demo.txt"},
enable_timestamp=False,
)
receipt_path = tmp_path / "receipts" / "receipt.json"
receipt_path.parent.mkdir(exist_ok=True)
receipt_path.write_text(receipt.to_json())
return receipt_path


def test_help_renders(tmp_path):
result = run_cli("--help", cwd=ROOT)

assert result.returncode == 0
assert "AgentMint" in result.stdout
assert "init" in result.stdout
assert "doctor" in result.stdout
assert "verify" in result.stdout


def test_init_creates_workspace(tmp_path):
result = subprocess.run(
["agentmint", "init", "--yes"], cwd=tmp_path, capture_output=True, text=True
)
(tmp_path / "agent.py").write_text("def submit_prior_auth(cpt_code, icd10_code, patient_id):\n return True\n")

result = run_cli("init", "--yes", cwd=tmp_path)

assert result.returncode == 0
assert (tmp_path / ".agentmint" / "config.toml").exists()
assert (tmp_path / ".agentmint" / "keys").is_dir()
assert (tmp_path / "receipts").is_dir()
assert (tmp_path / ".gitignore").read_text().count(".agentmint/") == 1


def test_three_minute_flow(tmp_path):
subprocess.run(["agentmint", "init", "--yes"], cwd=tmp_path, check=True)
_write_agent(tmp_path)
result = subprocess.run(["python3", "agent.py"], cwd=tmp_path, capture_output=True, text=True)
assert result.returncode == 0
receipts = list((tmp_path / "receipts").rglob("*.json"))
assert len(receipts) == 1
verify_result = subprocess.run(
["agentmint", "verify", str(receipts[0])], cwd=tmp_path, capture_output=True, text=True
)
assert verify_result.returncode == 0
assert "valid" in verify_result.stdout.lower()
def test_doctor_green_on_fresh_init(tmp_path):
run_cli("init", "--yes", cwd=tmp_path)

result = run_cli("doctor", cwd=tmp_path)

def test_doctor_passes_on_fresh_init(tmp_path):
subprocess.run(["agentmint", "init", "--yes"], cwd=tmp_path, check=True)
result = subprocess.run(["agentmint", "doctor"], cwd=tmp_path, capture_output=True, text=True)
assert result.returncode == 0
assert "healthy" in result.stdout.lower() or "needs attention" in result.stdout.lower()
assert "Result:" in result.stdout
assert "needs attention" in result.stdout.lower() or "healthy" in result.stdout.lower()


def test_privacy_zero_network_default(tmp_path):
subprocess.run(["agentmint", "init", "--yes"], cwd=tmp_path, check=True)
result = subprocess.run(["agentmint", "privacy"], cwd=tmp_path, capture_output=True, text=True)
run_cli("init", "--yes", cwd=tmp_path)

result = run_cli("privacy", cwd=tmp_path)

assert result.returncode == 0
assert "None" in result.stdout
assert ".agentmint" in result.stdout


def test_init_detects_healthcare(tmp_path):
(tmp_path / "agent.py").write_text(
"def submit_prior_auth(cpt_code, icd10_code, patient_id):\n"
" # HIPAA-compliant submission to payer\n"
" pass\n"
)
result = subprocess.run(
["agentmint", "init"], cwd=tmp_path, input="n\n", capture_output=True, text=True
)
assert "healthcare" in result.stdout.lower()
def test_verify_validates_generated_receipt(tmp_path):
run_cli("init", "--yes", cwd=tmp_path)
receipt_path = write_receipt(tmp_path)

result = run_cli("verify", str(receipt_path), cwd=tmp_path)

assert result.returncode == 0
assert "valid" in result.stdout.lower()


def test_show_renders_receipt(tmp_path):
subprocess.run(["agentmint", "init", "--yes"], cwd=tmp_path, check=True)
_write_agent(tmp_path)
subprocess.run(["python3", "agent.py"], cwd=tmp_path, check=True)
receipt_path = next((tmp_path / "receipts").rglob("*.json"))
result = subprocess.run(
["agentmint", "show", str(receipt_path)], cwd=tmp_path, capture_output=True, text=True
)
run_cli("init", "--yes", cwd=tmp_path)
receipt_path = write_receipt(tmp_path)

result = run_cli("show", str(receipt_path), cwd=tmp_path)

assert result.returncode == 0
assert "Receipt" in result.stdout
assert "Signature" in result.stdout


def test_no_color_flag_strips_ansi(tmp_path):
subprocess.run(["agentmint", "init", "--yes"], cwd=tmp_path, check=True)
result = subprocess.run(
["agentmint", "--no-color", "doctor"], cwd=tmp_path, capture_output=True, text=True
)
assert "\033[" not in result.stdout
def test_show_raw_renders_json(tmp_path):
run_cli("init", "--yes", cwd=tmp_path)
receipt_path = write_receipt(tmp_path)

result = run_cli("show", str(receipt_path), "--raw", cwd=tmp_path)

def test_init_interactive(tmp_path):
pexpect = pytest.importorskip("pexpect")
child = pexpect.spawn("agentmint init", cwd=str(tmp_path), timeout=10)
child.expect("Apply suggestions")
child.sendline("y")
child.expect_exact("Setup complete")
child.expect(pexpect.EOF)
assert (tmp_path / ".agentmint").exists()
assert result.returncode == 0
assert '"type": "notarised_evidence"' in result.stdout
67 changes: 67 additions & 0 deletions tests/conftest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
from __future__ import annotations

import json
from pathlib import Path
from typing import Any, List

import pytest

from agentmint.notary import Notary


class RecordingSink:
"""In-memory sink used by integration-style tests."""

def __init__(self) -> None:
self.receipts: List[Any] = []

def emit(self, receipt: Any) -> None:
self.receipts.append(receipt)

def flush(self) -> None:
pass

def close(self) -> None:
pass


@pytest.fixture
def tmp_key_dir(tmp_path: Path) -> Path:
return tmp_path / "keys"


@pytest.fixture
def recording_sink() -> RecordingSink:
return RecordingSink()


@pytest.fixture
def notary(tmp_key_dir: Path, recording_sink: RecordingSink) -> Notary:
return Notary(key=tmp_key_dir, sink=recording_sink)


@pytest.fixture
def plan(notary: Notary):
return notary.create_plan(
user="tests@agentmint.dev",
action="tests",
scope=["tool:*", "read:*", "write:*"],
delegates_to=["test-agent"],
)


@pytest.fixture
def aerf_schema() -> dict[str, Any]:
schema_path = Path(__file__).parent.parent / "schemas" / "aerf-v0.1.json"
return json.loads(schema_path.read_text())


@pytest.fixture
def receipt(notary: Notary, plan):
return notary.notarise(
action="tool:test",
agent="test-agent",
plan=plan,
evidence={"key": "value"},
enable_timestamp=False,
)
43 changes: 43 additions & 0 deletions tests/strategies.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
from __future__ import annotations

import random
import string
from typing import Dict, Iterable, Iterator, Tuple


_ACTION_ALPHABET = string.ascii_lowercase + string.digits + "_-"
_EVIDENCE_KEY_ALPHABET = string.ascii_letters + string.digits + "_"


def iter_actions(seed: int = 0, count: int = 25) -> Iterator[str]:
rng = random.Random(seed)
for _ in range(count):
part_count = rng.randint(1, 4)
parts = []
for _ in range(part_count):
size = rng.randint(1, 12)
parts.append("".join(rng.choice(_ACTION_ALPHABET) for _ in range(size)))
yield ":".join(parts)


def iter_evidence(seed: int = 0, count: int = 25) -> Iterator[Dict[str, object]]:
rng = random.Random(seed + 1)
for _ in range(count):
item_count = rng.randint(1, 6)
evidence: Dict[str, object] = {}
for idx in range(item_count):
key_len = rng.randint(1, 12)
key = "".join(rng.choice(_EVIDENCE_KEY_ALPHABET) for _ in range(key_len))
choice = (idx + rng.randint(0, 2)) % 3
if choice == 0:
evidence[key] = rng.randint(-1000, 1000)
elif choice == 1:
evidence[key] = bool(rng.randint(0, 1))
else:
value_len = rng.randint(0, 32)
evidence[key] = "".join(rng.choice(_ACTION_ALPHABET) for _ in range(value_len))
yield evidence


def iter_action_evidence_cases(seed: int = 0, count: int = 25) -> Iterable[Tuple[str, Dict[str, object]]]:
return zip(iter_actions(seed=seed, count=count), iter_evidence(seed=seed, count=count))
Loading
Loading