Skip to content

mukund1985/agentic-safety-patterns

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

2 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

agentic-safety-patterns

Python License Status Code style

A production pattern library for building safe agentic AI systems. Five composable primitives that address the failure modes autonomous agents introduce: runaway failure loops, irreversible actions, unaccountable decisions, and undetected side effects.


Why agentic safety patterns matter in production

A single LLM returning a bad response is a UX problem. An autonomous agent taking a bad action in a multi-step plan is an operational one.

At scale, agentic systems interact with real infrastructure: they call APIs, modify state, route content, allocate resources. When they fail — and they will fail — the question is whether the failure is contained and recoverable, or whether it compounds. The difference is engineering, not model capability.

These patterns are the engineering half. They don't prevent agents from making mistakes. They ensure that when mistakes happen, the system can detect them, explain them, audit them, and undo them.


Pattern Catalogue

Pattern Problem Key mechanism
Circuit Breaker Agent stuck in failure loop causes cascading damage Threshold + rate-based tripping, HALF_OPEN probe recovery
Explainability Hooks Agent actions are black-box; no reasoning trail for debugging Pre/post/error decorator hooks producing ActionExplanation records
Counterfactual Estimator Agent takes high-impact action without impact assessment impact_delta = proposed − counterfactual; gate on threshold
Audit Logger No tamper-evident record of what the agent did and why SHA-256 hash-chained JSONL event log with causal chain IDs
Rollback Manager Agent failure mid-plan leaves system in corrupt state Checkpoint stack with safe-point targeting and per-action undo hooks

Installation

git clone https://github.com/mukund1985/agentic-safety-patterns.git
cd agentic-safety-patterns
pip install -r requirements.txt
pip install -e .

Quick Start

from patterns import (
    AgentCircuitBreaker, CircuitBreakerConfig,
    ExplainabilityHookManager,
    CounterfactualEstimator,
    AuditLogger,
    RollbackManager,
)
from patterns.audit_logger import InMemoryAuditSink

agent_id = "content-ranker-v3"

# ── Circuit breaker ───────────────────────────────────────────
cb = AgentCircuitBreaker(
    agent_id=agent_id,
    config=CircuitBreakerConfig(failure_threshold=5, cooldown_period_s=60.0),
)

@cb.protect
def call_ranking_service(user_id: str, content_ids: list[str]) -> list[str]:
    return ranking_api.rank(user_id, content_ids)

# ── Explainability hooks ──────────────────────────────────────
hooks = ExplainabilityHookManager(agent_id=agent_id)

@hooks.explain(
    reasoning="Ranking content based on engagement signals and safety classifier output.",
    confidence=0.91,
    alternatives_considered=["chronological sort", "diversity-reranked"],
)
def rank_content(user_id: str, content_ids: list[str]) -> list[str]:
    return call_ranking_service(user_id, content_ids)

# ── Counterfactual gate ───────────────────────────────────────
estimator = CounterfactualEstimator(block_threshold=0.75, review_threshold=0.40)

def safe_rank(user_id: str, content_ids: list[str]) -> list[str]:
    assessment = estimator.gate(
        "rank_content",
        {"user_id": user_id, "content_ids": content_ids},
    )
    return rank_content(user_id=user_id, content_ids=content_ids)

# ── Audit log ─────────────────────────────────────────────────
sink = InMemoryAuditSink()
audit = AuditLogger(agent_id=agent_id, sinks=[sink])
audit.log_action_proposed("rank_content", user_id=user_id)

# ── Rollback ──────────────────────────────────────────────────
mgr = RollbackManager(agent_id=agent_id)
cp = mgr.checkpoint("before_ranking", state=agent_state, is_safe_point=True)

try:
    result = safe_rank(user_id="uid_123", content_ids=["p1", "p2", "p3"])
    audit.log_action_executed("rank_content", result_count=len(result))
except Exception as exc:
    audit.log_action_blocked("rank_content", reason=str(exc))
    mgr.rollback_to_last_safe_point()

Pattern Details

Circuit Breaker

Thread-safe circuit breaker with three states: CLOSED (normal), OPEN (halted), HALF_OPEN (probe recovery). Supports both threshold-based tripping (N consecutive failures) and rate-based tripping (error rate over a rolling window).

cb = AgentCircuitBreaker(
    agent_id="planner",
    config=CircuitBreakerConfig(
        failure_threshold=5,          # consecutive failures to open
        error_rate_threshold=0.50,    # fraction of failures over window
        error_rate_window_s=300.0,    # rolling window
        cooldown_period_s=60.0,       # time in OPEN before HALF_OPEN probe
    ),
    on_state_change=lambda aid, old, new: alert(f"{aid}: {old}{new}"),
)

# Use as decorator
@cb.protect
def execute_tool(tool: str, args: dict): ...

# Or as context manager
with cb.guard():
    agent.act(observation)

Explainability Hooks

Decorator factory that wraps agent actions and emits structured ActionExplanation records at pre-action, post-action, and on-error phases. Hooks are non-blocking — failures in callbacks never propagate to the agent.

hooks = ExplainabilityHookManager("my-agent")
hooks.register_callback(lambda exp: metrics_db.insert(exp.to_dict()))

@hooks.explain(
    reasoning_fn=lambda tool, args: f"Calling {tool} to resolve {args.get('query')}",
    confidence=0.88,
)
def call_tool(tool: str, args: dict): ...

# Retrieve for audit
errors = hooks.get_explanations(phase=HookPhase.ON_ERROR)

Counterfactual Estimator

Estimates impact_delta = |proposed_impact − counterfactual_impact| using a pluggable impact model. Classifies impact as NEGLIGIBLE / LOW / MODERATE / HIGH / CRITICAL and recommends proceed / review / block.

# Plug in your own model: LLM judge, regression, or rules engine
def my_impact_model(action_name: str, args: dict) -> float:
    return risk_scorer.score(action_name, args)

estimator = CounterfactualEstimator(
    impact_model=my_impact_model,
    block_threshold=0.75,
    review_threshold=0.40,
)

# gate() raises ActionBlockedError if recommended_action == "block"
assessment = estimator.gate("send_push_notification", {"user_id": "u123"})

Audit Logger

Append-only, hash-chained event log. Each event carries a SHA-256 hash of the previous event — making post-hoc tampering detectable. Pluggable sinks: FileAuditSink (JSONL), InMemoryAuditSink (testing), or implement AuditSink for Kafka/Redis Streams.

audit = AuditLogger(
    agent_id="planner-v2",
    sinks=[FileAuditSink("logs/agent_audit.jsonl")],
)

audit.log_action_proposed("write_db", query="INSERT INTO ...", user_id="u123")
audit.log_action_executed("write_db", rows_affected=42)
audit.log_policy_violation("write_db", policy="PII_ACCESS_RESTRICTED")

Rollback Manager

Checkpoint stack with configurable max depth. Supports rollback by checkpoint ID, to the last safe point, or by N steps. Per-action rollback hooks let you execute custom undo logic (e.g. reverse a database write, cancel an API call).

mgr = RollbackManager(agent_id="executor", max_checkpoints=100)
mgr.register_rollback_hook("write_db", lambda name, delta: db.delete(delta["inserted_ids"]))

cp = mgr.checkpoint("before_write", state=agent_state, is_safe_point=True)
mgr.checkpoint("write_db", state=new_state, environment_delta={"inserted_ids": [1, 2, 3]})

# On failure — runs registered hooks in reverse, restores state
mgr.rollback_to_last_safe_point(execute_hooks=True)

End-to-End Demo

See examples/safe_agent_demo.py for a complete walkthrough of all five patterns composing in a realistic content-ranking agent loop — including failure injection, rollback, and audit trail inspection.

python examples/safe_agent_demo.py

How these patterns are used in real agentic systems

In production agentic pipelines operating at scale:

  • Circuit breakers sit at the tool execution boundary, preventing an agent from hammering a degraded downstream service or looping on malformed plan steps.
  • Explainability hooks are registered on all tool calls and feed structured records to a real-time observability dashboard and an offline eval pipeline.
  • Counterfactual gates run before any action with affected_scope = "cohort" or "global" — actions that affect more than a single user require pre-execution impact sign-off.
  • Audit logs are written to append-only streams (Kafka, Redis Streams) and retained for compliance, post-incident forensics, and model behaviour auditing.
  • Rollback checkpoints are created at every plan step boundary. On plan failure, the orchestrator rolls back to the last verified safe point before retrying or escalating to human review.

Running Tests

pytest tests/ -v

Contributing

Contributions are welcome. Priority areas:

  • New patterns — rate limiting per agent action, human-in-the-loop escalation gates, sandboxed tool execution
  • Async variants — async-native circuit breaker and rollback manager for high-throughput async agent frameworks
  • Sink implementations — Kafka producer, Redis Streams, OpenTelemetry trace export
  • Integration examples — LangChain, LlamaIndex, AutoGen, custom tool-calling loops

Please open an issue to discuss scope before submitting a large PR.


License

MIT — see LICENSE

About

Pattern library for safe agentic AI systems — circuit breakers, explainability hooks, counterfactual impact estimation, audit logging, rollback

Topics

Resources

License

Stars

Watchers

Forks

Releases

No releases published

Packages

 
 
 

Contributors

Languages