Skip to content

Commit 533eb2b

Browse files
Add specialty clinic evidence demo (#12)
1 parent d553881 commit 533eb2b

27 files changed

Lines changed: 3570 additions & 1 deletion

bluemagma_demo.py

Lines changed: 325 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,325 @@
1+
"""Blue Magma · Continuous Compliance Collector — live agent demo.
2+
3+
A real Anthropic agent runs Blue Magma's collector tools against a customer
4+
environment. Every tool call becomes a signed, hash-chained receipt the
5+
customer can hand to their auditor.
6+
7+
python bluemagma_demo.py # enforce mode (default)
8+
python bluemagma_demo.py --shadow # observe only — safe pilot mode
9+
python bluemagma_demo.py --fast # skip the typewriter pacing
10+
11+
Requires:
12+
pip install anthropic pynacl
13+
export ANTHROPIC_API_KEY=sk-...
14+
"""
15+
16+
from __future__ import annotations
17+
18+
import argparse
19+
import os
20+
import sys
21+
import time
22+
from pathlib import Path
23+
24+
# ── Safe imports with clear fix instructions ─────────────────
25+
26+
def _die(msg: str, fix: str) -> None:
27+
sys.stderr.write(f"\n {msg}\n fix: {fix}\n\n")
28+
sys.exit(1)
29+
30+
try:
31+
from anthropic import Anthropic
32+
except ImportError:
33+
_die("anthropic package not installed.", "pip install anthropic")
34+
35+
try:
36+
from collector import Collector, InvocationResult, tool_schema_for_anthropic
37+
import tools # registers the @tool-decorated functions # noqa: F401
38+
from agentmint.notary import PlanReceipt
39+
except ImportError as e:
40+
_die(f"import failed: {e}", "pip install -e . (from the repo root)")
41+
42+
if not os.environ.get("ANTHROPIC_API_KEY"):
43+
_die("ANTHROPIC_API_KEY not set.", "export ANTHROPIC_API_KEY=sk-...")
44+
45+
46+
MODEL = "claude-sonnet-4-5"
47+
48+
49+
# ── Palette ──────────────────────────────────────────────────
50+
51+
C_FG = (226, 232, 240)
52+
C_DIM = (148, 163, 184)
53+
C_DIM2 = (100, 116, 139)
54+
C_BLUE = (59, 130, 246)
55+
C_GREEN = (16, 185, 129)
56+
C_YELLOW = (251, 191, 36)
57+
RESET = "\033[0m"
58+
ROW_W = 70
59+
60+
61+
def _ansi(rgb): r, g, b = rgb; return f"\033[38;2;{r};{g};{b}m"
62+
def _style(s, c): return f"{_ansi(c)}{s}{RESET}"
63+
64+
65+
# ── Pacing ───────────────────────────────────────────────────
66+
67+
class Pace:
68+
char_speed = 0.012 # typed text
69+
line_pause = 0.05
70+
block_pause = 0.35
71+
72+
@classmethod
73+
def fast(cls):
74+
cls.char_speed = cls.line_pause = cls.block_pause = 0
75+
76+
77+
def _write(s): sys.stdout.write(s); sys.stdout.flush()
78+
79+
def line(s=""):
80+
_write(s + "\n")
81+
if Pace.line_pause: time.sleep(Pace.line_pause)
82+
83+
def typed(text, color=C_FG, end="\n"):
84+
if Pace.char_speed <= 0:
85+
_write(_ansi(color) + text + RESET + end); return
86+
_write(_ansi(color))
87+
for ch in text:
88+
_write(ch)
89+
time.sleep(Pace.char_speed)
90+
_write(RESET + end)
91+
92+
def pause(s):
93+
if s > 0: time.sleep(s)
94+
95+
96+
# ── Composed elements ────────────────────────────────────────
97+
98+
def brand(): return f"{_style('Blue Magma', C_BLUE)}"
99+
def rule(w=ROW_W): return _style("─" * w, C_DIM2)
100+
101+
102+
def header(mode: str) -> None:
103+
mode_color = C_GREEN if mode == "enforce" else C_YELLOW
104+
line()
105+
line(f" {brand()} {_style('·', C_DIM)} {_style('Continuous Compliance Collector', C_FG)}")
106+
line(f" {_style('notarised by agentmint', C_DIM2)} {_style('·', C_DIM2)} {_style(f'mode {mode}', mode_color)}")
107+
line(f" {rule()}")
108+
pause(Pace.block_pause)
109+
110+
111+
def _plan_row(label: str, value: str, value_color=C_FG, label_width: int = 22) -> None:
112+
pad = " " * max(1, label_width - len(label))
113+
_write(f" {_style(label, C_DIM)}{pad}")
114+
typed(value, value_color)
115+
116+
117+
def plan_banner(plan: PlanReceipt, operator: str, agent: str, mode: str) -> None:
118+
"""Three-line banner showing what the operator authorized, typed out."""
119+
scopes = ", ".join(s.replace(":*", "") for s in plan.scope[:2])
120+
if len(plan.scope) > 2:
121+
scopes += f", +{len(plan.scope) - 2} more"
122+
_plan_row("Run authorized by", operator)
123+
_plan_row("Collector", agent)
124+
_plan_row("Evidence scope", scopes)
125+
_plan_row("Plan id", f"{plan.id[:8]} · ed25519 signed", value_color=C_DIM2)
126+
line()
127+
pause(Pace.block_pause)
128+
129+
130+
# ── Step display ─────────────────────────────────────────────
131+
132+
def _status_color(status: str):
133+
return C_YELLOW if ("BLOCKED" in status or "OBSERVED" in status) else C_GREEN
134+
135+
136+
def _shield_label(shield) -> tuple[str, tuple[int, int, int]]:
137+
serious = sum(1 for t in shield.threats if t.severity in ("warn", "block"))
138+
if serious == 0:
139+
return (f"{shield.scanned_fields} fields · clean", C_DIM2)
140+
return (f"{shield.scanned_fields} fields · {serious} flagged", C_YELLOW)
141+
142+
143+
def step(r: InvocationResult, total: int) -> None:
144+
prefix = f"[{r.step}/{total}]"
145+
left_plain = f" {prefix} {r.action}"
146+
pad = max(2, ROW_W - len(left_plain) - len(r.status))
147+
prefix_md = prefix.replace("[", r"\[") # rich-safe even if reused elsewhere
148+
149+
_write(f" {_style(prefix, C_BLUE)} ")
150+
typed(r.action, C_FG, end="")
151+
pause(0.25)
152+
_write(" " * pad + _style(r.status, _status_color(r.status)) + "\n")
153+
154+
shield_txt, shield_c = _shield_label(r.shield)
155+
rid = r.receipt.id[:8]
156+
line(f" {_style('Shield', C_DIM)} {_style(shield_txt, shield_c)}")
157+
line(f" {_style('Control', C_DIM)} {_style(r.control, C_FG)}")
158+
line(f" {_style('Evidence',C_DIM)} {_style(r.summary, C_FG)}")
159+
line(f" {_style('Receipt', C_DIM)} {_style(rid + ' · signed · portable', C_DIM2)}")
160+
line()
161+
pause(Pace.block_pause)
162+
163+
164+
def footer(bundle: Path, n: int, mode: str) -> None:
165+
line(f" {_style('Evidence bundle ready for audit handoff', C_FG)}")
166+
line(f" {_style('path', C_DIM)} {_style(str(bundle), C_FG)}")
167+
line(f" {_style('receipts', C_DIM)} {_style(str(n), C_FG)} {_style('· chain-linked · ed25519 signed', C_DIM2)}")
168+
line()
169+
line(f" " + _style("Your customer's auditor verifies it independently:", C_DIM))
170+
line(f" {_style(f'cd {bundle} && bash VERIFY.sh', C_FG)}")
171+
line()
172+
if mode == "shadow":
173+
line(f" {_style('Shadow mode — observed, never blocked.', C_DIM)} "
174+
f"{_style('Flip to enforce when the customer is ready:', C_DIM)}")
175+
line(f" {_style('python bluemagma_demo.py', C_FG)}")
176+
line()
177+
178+
179+
# ── Agent loop ───────────────────────────────────────────────
180+
181+
SYSTEM_PROMPT = """You are the Blue Magma compliance collector agent, running against a customer's AWS environment.
182+
183+
Your job: perform a standard SOC 2 evidence pass using the tools provided.
184+
185+
Rules:
186+
- Before each tool call, say ONE short sentence (8–14 words) in plain prose. No bullets, no numbered lists.
187+
- If a tool is blocked at a compliance checkpoint, read the error and use the narrow-approval alternative with approver="security-lead@acme.com".
188+
- When all evidence is gathered, say "Evidence collection complete." and stop."""
189+
190+
191+
USER_PROMPT = (
192+
"Run the standard SOC 2 evidence collection for this customer: "
193+
"list IAM users, verify MFA posture, review S3 bucket configuration. "
194+
"Then attach a ReadOnlyAccess policy to bob as part of baseline hardening."
195+
)
196+
197+
198+
def _format_tool_result(r: InvocationResult) -> str:
199+
import json
200+
if r.blocked:
201+
return (
202+
f"BLOCKED at compliance checkpoint. Reason: {r.receipt.policy_reason}. "
203+
f"Signed denial receipt: {r.receipt.id[:8]}. "
204+
f"This action requires narrow-scoped pre-approval — "
205+
f"use attach_iam_policy_narrow with an explicit approver email."
206+
)
207+
return json.dumps(r.output, separators=(",", ":"))
208+
209+
210+
def agent_say_start() -> None:
211+
_write(f" {_style('AGENT', C_BLUE)} ")
212+
_write(_ansi(C_DIM))
213+
214+
215+
def agent_say_delta(text: str) -> None:
216+
_write(text)
217+
if Pace.char_speed:
218+
time.sleep(Pace.char_speed * 0.5)
219+
220+
221+
def agent_say_end() -> None:
222+
_write(RESET + "\n\n")
223+
224+
225+
def run_agent(c: Collector, total: int) -> None:
226+
client = Anthropic()
227+
messages = [{"role": "user", "content": USER_PROMPT}]
228+
229+
while True:
230+
current_block = None
231+
tool_uses: list = []
232+
assistant_content: list = []
233+
234+
with client.messages.stream(
235+
model=MODEL,
236+
max_tokens=512,
237+
system=SYSTEM_PROMPT,
238+
tools=tool_schema_for_anthropic(),
239+
messages=messages,
240+
) as stream:
241+
for event in stream:
242+
t = event.type
243+
if t == "content_block_start":
244+
current_block = event.content_block.type
245+
if current_block == "text":
246+
agent_say_start()
247+
elif t == "content_block_delta":
248+
d = event.delta
249+
if current_block == "text" and d.type == "text_delta":
250+
agent_say_delta(d.text)
251+
elif t == "content_block_stop":
252+
if current_block == "text":
253+
agent_say_end()
254+
current_block = None
255+
final = stream.get_final_message()
256+
257+
# Translate final content blocks into assistant message + execute tools
258+
for block in final.content:
259+
if block.type == "text":
260+
assistant_content.append({"type": "text", "text": block.text})
261+
elif block.type == "tool_use":
262+
assistant_content.append({
263+
"type": "tool_use", "id": block.id,
264+
"name": block.name, "input": block.input,
265+
})
266+
result = c.invoke(block.name, block.input or {})
267+
step(result, total)
268+
tool_uses.append((block.id, result))
269+
270+
messages.append({"role": "assistant", "content": assistant_content})
271+
272+
if not tool_uses:
273+
break # agent ended turn with no tool calls — we're done
274+
275+
messages.append({
276+
"role": "user",
277+
"content": [
278+
{
279+
"type": "tool_result",
280+
"tool_use_id": tid,
281+
"content": _format_tool_result(res),
282+
"is_error": res.blocked,
283+
}
284+
for tid, res in tool_uses
285+
],
286+
})
287+
288+
289+
# ── Main ─────────────────────────────────────────────────────
290+
291+
def main() -> None:
292+
ap = argparse.ArgumentParser()
293+
ap.add_argument("--shadow", action="store_true", help="Observe only — do not enforce.")
294+
ap.add_argument("--fast", action="store_true", help="Skip the typewriter pacing.")
295+
args = ap.parse_args()
296+
297+
if args.fast: Pace.fast()
298+
299+
mode = "shadow" if args.shadow else "enforce"
300+
operator = "security-lead@acme.com"
301+
agent = "bluemagma-agent"
302+
303+
c = Collector(agent=agent, operator=operator, mode=mode)
304+
plan = c.plan(
305+
scope=[
306+
"read:iam:*",
307+
"read:s3:*",
308+
"change:iam:attach-policy-narrow:*",
309+
],
310+
checkpoints=["change:iam:attach-policy"],
311+
)
312+
313+
header(mode)
314+
plan_banner(plan, operator=operator, agent=agent, mode=mode)
315+
316+
# Target step count (for the [k/N] counter). Five tool calls if the
317+
# checkpoint fires and the agent retries with the narrow version.
318+
run_agent(c, total=5)
319+
320+
bundle = c.export(Path("./output/evidence"))
321+
footer(bundle, n=len(c.results), mode=mode)
322+
323+
324+
if __name__ == "__main__":
325+
main()

0 commit comments

Comments
 (0)