Skip to content

flyersworder/agentic-data-contracts

Repository files navigation

agentic-data-contracts

PyPI version CI Python 3.12+ License: MIT

Stop your AI agents from running wild on your data.

agentic-data-contracts lets data engineers define governance contracts in YAML — what tables an agent may query, which operations are forbidden, what resource limits apply — and enforces them automatically at query time via SQL validation powered by sqlglot.

Why? AI agents querying databases face two problems: resource runaway (unbounded compute, endless retries, cost overruns) and semantic inconsistency (wrong tables, missing filters, ad-hoc metric definitions). This library addresses both with a single YAML contract.

Works with: Claude Agent SDK (primary target), or any Python agent framework. Optionally integrates with ai-agent-contracts for formal resource governance.

How It Works

Agent: "SELECT * FROM analytics.orders"
  -> BLOCKED (no SELECT * — specify explicit columns)

Agent: "SELECT order_id, amount FROM analytics.orders"
  -> BLOCKED (missing required filter: tenant_id)

Agent: "SELECT order_id, amount FROM analytics.orders WHERE tenant_id = 'acme'"
  -> PASSED + WARN (consider using semantic revenue definition)

Agent: "DELETE FROM analytics.orders WHERE id = 1"
  -> BLOCKED (forbidden operation: DELETE)

The contract defines the rules. The library enforces them — before the query ever reaches the database.

Installation

uv add agentic-data-contracts
# or
pip install agentic-data-contracts

With optional database adapters:

uv add "agentic-data-contracts[duckdb]"      # DuckDB
uv add "agentic-data-contracts[bigquery]"    # BigQuery
uv add "agentic-data-contracts[snowflake]"   # Snowflake
uv add "agentic-data-contracts[postgres]"    # PostgreSQL
uv add "agentic-data-contracts[agent-sdk]"   # Claude Agent SDK integration

Quick Start

1. Write a YAML contract

# contract.yml
version: "1.0"
name: revenue-analysis

semantic:
  source:
    type: yaml
    path: "./semantic.yml"
  allowed_tables:
    - schema: analytics
      tables: ["*"]          # all tables in schema (discovered from database)
    - schema: marketing
      tables: [campaigns]    # or list specific tables
  forbidden_operations: [DELETE, DROP, TRUNCATE, UPDATE, INSERT]
  rules:
    - name: tenant_isolation
      description: "All queries must filter by tenant_id"
      enforcement: block
      query_check:
        required_filter: tenant_id
    - name: no_select_star
      description: "Must specify explicit columns"
      enforcement: block
      query_check:
        no_select_star: true

resources:
  cost_limit_usd: 5.00
  max_retries: 3
  token_budget: 50000

temporal:
  max_duration_seconds: 300

2. Load the contract and create tools

from agentic_data_contracts import DataContract, create_tools
from agentic_data_contracts.adapters.duckdb import DuckDBAdapter

dc = DataContract.from_yaml("contract.yml")
adapter = DuckDBAdapter("analytics.duckdb")

# Semantic source is auto-loaded from contract config (source.type + source.path)
tools = create_tools(dc, adapter=adapter)

3. Use with the Claude Agent SDK (requires claude-agent-sdk>=0.1.52)

import asyncio
from agentic_data_contracts import create_sdk_mcp_server
from claude_agent_sdk import (
    ClaudeAgentOptions,
    AssistantMessage,
    TextBlock,
    query,
)

# One-liner: wraps all 10 tools and bundles into an SDK MCP server
server = create_sdk_mcp_server(dc, adapter=adapter)

options = ClaudeAgentOptions(
    model="claude-sonnet-4-6",
    system_prompt=f"You are a revenue analytics assistant.\n\n{dc.to_system_prompt()}",
    mcp_servers={"dc": server},
    **dc.to_sdk_config(),  # token_budget → task_budget, max_retries → max_turns
)

async def run(prompt: str) -> None:
    async for message in query(prompt=prompt, options=options):
        if isinstance(message, AssistantMessage):
            for block in message.content:
                if isinstance(block, TextBlock):
                    print(block.text)

asyncio.run(run("What was total revenue by region in Q1 2025?"))

4. Or use the tools directly (no SDK required)

import asyncio

async def demo() -> None:
    # Validate a query without executing
    validate = next(t for t in tools if t.name == "validate_query")
    result = await validate.callable(
        {"sql": "SELECT id, amount FROM analytics.orders WHERE tenant_id = 'acme'"}
    )
    print(result["content"][0]["text"])
    # VALID — Query passed all checks.

    # Blocked query
    result = await validate.callable({"sql": "SELECT * FROM analytics.orders"})
    print(result["content"][0]["text"])
    # BLOCKED — Violations:
    # - SELECT * is not allowed — specify explicit columns

asyncio.run(demo())

The 10 Tools

Tool Description
list_schemas List all allowed database schemas from the contract
list_tables List allowed tables, optionally filtered by schema
describe_table Get full column details for an allowed table
preview_table Preview sample rows from an allowed table
list_metrics List metric definitions, optionally filtered by domain
lookup_metric Get a metric definition; fuzzy search fallback when no exact match
validate_query Validate a SQL query against contract rules without executing
query_cost_estimate Estimate cost and row count via EXPLAIN
run_query Validate and execute a SQL query, returning results
get_contract_info Get the full contract: rules, limits, and session status

Contract Rules

Rules are enforced at three levels:

  • block — query is rejected and an error is returned to the agent
  • warn — query proceeds but a warning is included in the response
  • log — violation is recorded but not surfaced to the agent

Each rule carries a query_check (pre-execution) or result_check (post-execution) block. Rules with neither are advisory — they appear in the system prompt but don't enforce anything. Every rule can be scoped to a specific table or applied globally.

Built-in query checks (pre-execution, validated against SQL AST):

Check Description
required_filter Require a column in WHERE clause (e.g., tenant_id)
no_select_star Forbid SELECT * — require explicit columns
blocked_columns Forbid specific columns in SELECT (e.g., PII)
require_limit Require a LIMIT clause
max_joins Cap the number of JOINs

Built-in result checks (post-execution, validated against query output):

Check Description
min_value / max_value Numeric bounds on a column's values
not_null Column must not contain nulls
min_rows / max_rows Row count bounds on the result set

Example with table scoping and both check types:

rules:
  - name: tenant_isolation
    description: "Orders must filter by tenant_id"
    enforcement: block
    table: "analytics.orders"      # only applies to this table
    query_check:
      required_filter: tenant_id

  - name: hide_pii
    description: "Do not select PII columns from customers"
    enforcement: block
    table: "analytics.customers"
    query_check:
      blocked_columns: [ssn, email, phone]

  - name: wau_sanity
    description: "WAU should not exceed world population"
    enforcement: warn
    table: "analytics.user_metrics"
    result_check:
      column: wau
      max_value: 8_000_000_000

  - name: no_negative_revenue
    description: "Revenue must not be negative"
    enforcement: block
    result_check:
      column: revenue
      min_value: 0

Semantic Sources

A semantic source provides metric, table schema, and relationship metadata to the agent. Paths are resolved relative to the contract file's directory (not the process CWD).

YAML (built-in):

# semantic.yml
metrics:
  - name: total_revenue
    description: "Total revenue from completed orders"
    sql_expression: "SUM(amount) FILTER (WHERE status = 'completed')"
    source_model: analytics.orders

tables:
  - schema: analytics
    table: orders
    columns:
      - name: id
        type: INTEGER
      - name: amount
        type: DECIMAL
      - name: tenant_id
        type: VARCHAR

dbt — point to a manifest.json:

semantic:
  source:
    type: dbt
    path: "./dbt/manifest.json"

Cube — point to a Cube schema file:

semantic:
  source:
    type: cube
    path: "./cube/schema.yml"

Table Relationships

Define join paths so the agent knows how to combine tables correctly:

# semantic.yml
relationships:
  - from: analytics.orders.customer_id
    to: analytics.customers.id
    type: many_to_one
    description: >
      Join orders to customers for region-level breakdowns.
      Every order has exactly one customer.

  - from: analytics.bdg_attribution.contact_id
    to: analytics.contacts.contact_id
    type: many_to_one
    description: "Bridge table — filter to avoid fan-out from multiple attribution records."
    required_filter: "attribution_model = 'last_touch_attribution'"
Field Required Description
from / to Yes Fully qualified column references (schema.table.column)
type No Cardinality: many_to_one (default), one_to_one, many_to_many
description No Free-text context for the agent (join guidance, caveats, data quality notes)
required_filter No SQL condition that must be applied when using this join (e.g., bridge table disambiguation)

The agent sees these in its system prompt and uses them to write correct JOINs instead of guessing from column names.

Custom Prompt Rendering

The system prompt is generated by a PromptRenderer. The default ClaudePromptRenderer produces XML-structured output optimized for Claude models:

dc = DataContract.from_yaml("contract.yml")
print(dc.to_system_prompt())  # XML output, optimized for Claude

For other models (GPT-4, Gemini, Llama), implement the PromptRenderer protocol:

from agentic_data_contracts import PromptRenderer, DataContract

class MarkdownRenderer:
    def render(self, contract, semantic_source=None):
        tables = "\n".join(f"- {t}" for t in contract.allowed_table_names())
        return f"## {contract.name}\n\nAllowed tables:\n{tables}"

dc = DataContract.from_yaml("contract.yml")
print(dc.to_system_prompt(renderer=MarkdownRenderer()))

Scalable Metric Discovery

For large data lakes with hundreds of KPIs, group metrics by domain and let the agent discover them efficiently:

semantic:
  domains:
    acquisition: [CAC, CPA, CPL, click_through_rate]
    retention: [churn_rate, LTV, retention_30d]
    attribution: [ROAS, first_touch_revenue]

The system prompt gets a compact index (names + descriptions grouped by domain). The agent uses lookup_metric for full SQL definitions — with fuzzy fallback when it doesn't know the exact name:

lookup_metric("CAC")                → exact match, full definition
lookup_metric("acquisition cost")   → fuzzy match, returns [CAC, CPA] as candidates
list_metrics(domain="retention")    → only retention metrics

Scaling to Large Organizations

Tested for 200+ tables, 300+ metrics, 50+ relationships across multiple schemas.

Concern How it scales
System prompt size >20 metrics: auto-switches to compact domain counts (acquisition (45)) instead of listing every metric
Table discovery list_tables is paginated (default 50, with offset). Use schema filter for targeted browsing
Wildcard schemas tables: ["*"] discovers tables from the database. Resolution is cached — no repeated queries
Metric lookup Fuzzy search via thefuzz (C++ backed) — sub-millisecond even with 1000+ metrics
SQL validation Set-based allowlist check — O(1) per table reference regardless of allowlist size

Resource Limits

resources:
  cost_limit_usd: 5.00          # max estimated query cost
  max_retries: 3                 # max blocked queries per session
  token_budget: 50000            # max tokens consumed
  max_query_time_seconds: 30     # max wall-clock query time
  max_rows_scanned: 1000000      # max rows an EXPLAIN may estimate

Optional Dependencies

Extra Package Purpose
duckdb duckdb DuckDB adapter
bigquery google-cloud-bigquery BigQuery adapter
snowflake snowflake-connector-python Snowflake adapter
postgres psycopg2-binary PostgreSQL adapter
agent-sdk claude-agent-sdk Claude Agent SDK integration
agent-contracts ai-agent-contracts>=0.2.0 ai-agent-contracts bridge

Optional: Formal Governance with ai-agent-contracts

The library works standalone with lightweight enforcement. Install ai-agent-contracts to upgrade to the formal governance framework:

pip install "agentic-data-contracts[agent-contracts]"
from agentic_data_contracts.bridge.compiler import compile_to_contract

contract = compile_to_contract(dc)  # YAML → formal 7-tuple Contract

What you get with the bridge:

Concern Standalone With ai-agent-contracts
Resource tracking Manual counters Formal ResourceConstraints with auto-enforcement
Rule violations Exception + retry TerminationCondition with contract state machine
Success evaluation Log-based Weighted SuccessCriterion scoring, LLM judge support
Contract lifecycle None DRAFTED → ACTIVE → FULFILLED / VIOLATED / TERMINATED
Framework support Claude Agent SDK + LiteLLM, LangChain, LangGraph, Google ADK
Multi-agent Single agent Coordination patterns (sequential, parallel, hierarchical)

When to use it: formal audit trails, success scoring, multi-agent coordination, or integration with non-Claude agent frameworks.

Example

See examples/revenue_agent/ for a complete working example with a DuckDB database, YAML semantic source, and Claude Agent SDK integration.

uv run python examples/revenue_agent/setup_db.py
uv run python examples/revenue_agent/agent.py "What was Q1 revenue by region?"

Architecture

See docs/architecture.md for the full design spec covering the layered architecture, YAML schema, validation pipeline, tool design, semantic sources, database adapters, and the optional ai-agent-contracts bridge.

License

MIT

About

Formal resource governance and semantic consistency for autonomous AI agents querying data — YAML-first contract layer for data/analytics engineers

Topics

Resources

License

Stars

Watchers

Forks

Packages

 
 
 

Contributors

Languages