From 743c26fdf157d6ecf176c433374831d9d687b569 Mon Sep 17 00:00:00 2001
From: pbtc21
Date: Wed, 21 Jan 2026 02:24:21 +0000
Subject: [PATCH 1/4] feat(bitcoin-agents): add backend API, tools, and
services for Phase 2
Implements Phase 2 of Bitcoin Agents backend integration:
Models (app/backend/models.py):
- BitcoinAgentStatus enum (alive/dead)
- BitcoinAgentLevel enum (hatchling/junior/senior/elder/legendary)
- BitcoinAgent model with on-chain data, computed state, face URLs
- BitcoinAgentFilter for queries
- DeathCertificate model with epitaph support
- DeathCertificateFilter
API Router (app/api/bitcoin_agents.py):
- GET /bitcoin-agents - List all agents with filters
- GET /bitcoin-agents/leaderboard - Top agents by XP
- GET /bitcoin-agents/graveyard - Dead agents memorial
- GET /bitcoin-agents/stats - Global statistics
- GET /bitcoin-agents/food-tiers - Food pricing info
- GET /bitcoin-agents/{id} - Get specific agent
- GET /bitcoin-agents/{id}/status - Computed hunger/health
- GET /bitcoin-agents/{id}/death-certificate
- POST /bitcoin-agents/mint - x402 payment flow
- POST /bitcoin-agents/{id}/feed - x402 payment flow
- POST /bitcoin-agents/{id}/check-death
- POST /bitcoin-agents/{id}/epitaph
Tools (app/tools/bitcoin_agents.py):
- Contract read functions (get_agent_state, get_computed_state, etc.)
- Transaction builders (build_mint_agent_tx, build_feed_agent_tx, etc.)
- Batch operations (check_and_process_deaths)
- LangChain tools for MCP integration
Services (app/services/bitcoin_agents/):
- face_service.py: Bitcoin Faces API integration with caching
- lifecycle_service.py: Background jobs for death checks, alerts, stats
Co-Authored-By: Claude Opus 4.5
---
app/api/bitcoin_agents.py | 395 +++++++++++++++
app/backend/models.py | 131 +++++
app/main.py | 3 +-
app/services/bitcoin_agents/__init__.py | 9 +
app/services/bitcoin_agents/face_service.py | 155 ++++++
.../bitcoin_agents/lifecycle_service.py | 289 +++++++++++
app/tools/bitcoin_agents.py | 471 ++++++++++++++++++
7 files changed, 1452 insertions(+), 1 deletion(-)
create mode 100644 app/api/bitcoin_agents.py
create mode 100644 app/services/bitcoin_agents/__init__.py
create mode 100644 app/services/bitcoin_agents/face_service.py
create mode 100644 app/services/bitcoin_agents/lifecycle_service.py
create mode 100644 app/tools/bitcoin_agents.py
diff --git a/app/api/bitcoin_agents.py b/app/api/bitcoin_agents.py
new file mode 100644
index 00000000..5b619385
--- /dev/null
+++ b/app/api/bitcoin_agents.py
@@ -0,0 +1,395 @@
+"""Bitcoin Agents API router.
+
+Provides CRUD endpoints for Tamagotchi-style AI agents with on-chain lifecycle.
+"""
+
+from typing import List, Optional
+
+from fastapi import APIRouter, HTTPException, Query
+from starlette.responses import JSONResponse
+
+from app.backend.models import (
+ BitcoinAgent,
+ BitcoinAgentFilter,
+ BitcoinAgentLevel,
+ BitcoinAgentStatus,
+ DeathCertificate,
+ DeathCertificateFilter,
+)
+from app.lib.logger import configure_logger
+
+# Configure logger
+logger = configure_logger(__name__)
+
+# Create the router
+router = APIRouter(prefix="/bitcoin-agents", tags=["bitcoin-agents"])
+
+# Contract constants
+CONTRACT_ADDRESS_MAINNET = "SP000000000000000000000000000000.bitcoin-agents" # TODO: Update after deployment
+CONTRACT_ADDRESS_TESTNET = "ST000000000000000000000000000000.bitcoin-agents" # TODO: Update after deployment
+
+# Food tier pricing (in sats)
+FOOD_TIERS = {
+ 1: {"name": "Basic", "cost": 100, "xp": 10},
+ 2: {"name": "Premium", "cost": 500, "xp": 25},
+ 3: {"name": "Gourmet", "cost": 1000, "xp": 50},
+}
+
+# Mint cost (in sats)
+MINT_COST = 10000
+
+
+@router.get("")
+async def list_agents(
+ owner: Optional[str] = Query(None, description="Filter by owner address"),
+ status: Optional[BitcoinAgentStatus] = Query(None, description="Filter by status (alive/dead)"),
+ level: Optional[BitcoinAgentLevel] = Query(None, description="Filter by evolution level"),
+ network: str = Query("mainnet", description="Network (mainnet/testnet)"),
+ limit: int = Query(50, ge=1, le=100, description="Max results to return"),
+ offset: int = Query(0, ge=0, description="Offset for pagination"),
+) -> JSONResponse:
+ """List all Bitcoin Agents with optional filters.
+
+ Returns agents sorted by XP (descending).
+ """
+ try:
+ logger.debug(
+ "Listing bitcoin agents",
+ extra={"owner": owner, "status": status, "level": level, "network": network},
+ )
+
+ # TODO: Implement actual contract reads and database caching
+ # For now, return empty list as placeholder
+ agents: List[dict] = []
+
+ return JSONResponse(
+ content={
+ "agents": agents,
+ "total": len(agents),
+ "limit": limit,
+ "offset": offset,
+ }
+ )
+
+ except Exception as e:
+ logger.error("Failed to list bitcoin agents", extra={"error": str(e)}, exc_info=e)
+ raise HTTPException(status_code=500, detail=f"Failed to list agents: {str(e)}")
+
+
+@router.get("/leaderboard")
+async def get_leaderboard(
+ network: str = Query("mainnet", description="Network (mainnet/testnet)"),
+ limit: int = Query(10, ge=1, le=50, description="Top N agents"),
+) -> JSONResponse:
+ """Get top agents by XP.
+
+ Returns the leaderboard of highest XP agents.
+ """
+ try:
+ logger.debug("Getting leaderboard", extra={"network": network, "limit": limit})
+
+ # TODO: Implement actual leaderboard query
+ leaderboard: List[dict] = []
+
+ return JSONResponse(content={"leaderboard": leaderboard, "network": network})
+
+ except Exception as e:
+ logger.error("Failed to get leaderboard", extra={"error": str(e)}, exc_info=e)
+ raise HTTPException(status_code=500, detail=f"Failed to get leaderboard: {str(e)}")
+
+
+@router.get("/graveyard")
+async def get_graveyard(
+ network: str = Query("mainnet", description="Network (mainnet/testnet)"),
+ limit: int = Query(50, ge=1, le=100, description="Max results"),
+ offset: int = Query(0, ge=0, description="Offset for pagination"),
+) -> JSONResponse:
+ """Get all dead agents (graveyard).
+
+ Returns death certificates sorted by death block (most recent first).
+ """
+ try:
+ logger.debug("Getting graveyard", extra={"network": network})
+
+ # TODO: Implement actual graveyard query
+ certificates: List[dict] = []
+
+ return JSONResponse(
+ content={
+ "certificates": certificates,
+ "total": len(certificates),
+ "limit": limit,
+ "offset": offset,
+ }
+ )
+
+ except Exception as e:
+ logger.error("Failed to get graveyard", extra={"error": str(e)}, exc_info=e)
+ raise HTTPException(status_code=500, detail=f"Failed to get graveyard: {str(e)}")
+
+
+@router.get("/stats")
+async def get_global_stats(
+ network: str = Query("mainnet", description="Network (mainnet/testnet)"),
+) -> JSONResponse:
+ """Get global Bitcoin Agents statistics.
+
+ Returns total agents, deaths, feedings, alive count.
+ """
+ try:
+ logger.debug("Getting global stats", extra={"network": network})
+
+ # TODO: Implement actual stats query from contract
+ stats = {
+ "total_agents": 0,
+ "total_deaths": 0,
+ "total_feedings": 0,
+ "alive_count": 0,
+ "network": network,
+ }
+
+ return JSONResponse(content=stats)
+
+ except Exception as e:
+ logger.error("Failed to get stats", extra={"error": str(e)}, exc_info=e)
+ raise HTTPException(status_code=500, detail=f"Failed to get stats: {str(e)}")
+
+
+@router.get("/food-tiers")
+async def get_food_tiers() -> JSONResponse:
+ """Get available food tiers and pricing.
+
+ Returns food tier options with costs and XP rewards.
+ """
+ return JSONResponse(content={"food_tiers": FOOD_TIERS, "mint_cost": MINT_COST})
+
+
+@router.get("/{agent_id}")
+async def get_agent(
+ agent_id: int,
+ network: str = Query("mainnet", description="Network (mainnet/testnet)"),
+) -> JSONResponse:
+ """Get a specific agent by ID.
+
+ Returns agent details including computed hunger/health state.
+ """
+ try:
+ logger.debug("Getting agent", extra={"agent_id": agent_id, "network": network})
+
+ # TODO: Implement actual contract read
+ # For now, return 404 as placeholder
+ raise HTTPException(status_code=404, detail=f"Agent {agent_id} not found")
+
+ except HTTPException:
+ raise
+ except Exception as e:
+ logger.error("Failed to get agent", extra={"agent_id": agent_id, "error": str(e)}, exc_info=e)
+ raise HTTPException(status_code=500, detail=f"Failed to get agent: {str(e)}")
+
+
+@router.get("/{agent_id}/status")
+async def get_agent_status(
+ agent_id: int,
+ network: str = Query("mainnet", description="Network (mainnet/testnet)"),
+) -> JSONResponse:
+ """Get computed hunger/health status for an agent.
+
+ Returns current computed state based on blocks elapsed since last fed.
+ """
+ try:
+ logger.debug("Getting agent status", extra={"agent_id": agent_id, "network": network})
+
+ # TODO: Call contract's get-computed-state function
+ raise HTTPException(status_code=404, detail=f"Agent {agent_id} not found")
+
+ except HTTPException:
+ raise
+ except Exception as e:
+ logger.error(
+ "Failed to get agent status",
+ extra={"agent_id": agent_id, "error": str(e)},
+ exc_info=e,
+ )
+ raise HTTPException(status_code=500, detail=f"Failed to get agent status: {str(e)}")
+
+
+@router.get("/{agent_id}/death-certificate")
+async def get_death_certificate(
+ agent_id: int,
+ network: str = Query("mainnet", description="Network (mainnet/testnet)"),
+) -> JSONResponse:
+ """Get death certificate for a dead agent.
+
+ Returns death certificate details including epitaph if set.
+ """
+ try:
+ logger.debug(
+ "Getting death certificate",
+ extra={"agent_id": agent_id, "network": network},
+ )
+
+ # TODO: Call contract's get-death-certificate function
+ raise HTTPException(status_code=404, detail=f"Death certificate for agent {agent_id} not found")
+
+ except HTTPException:
+ raise
+ except Exception as e:
+ logger.error(
+ "Failed to get death certificate",
+ extra={"agent_id": agent_id, "error": str(e)},
+ exc_info=e,
+ )
+ raise HTTPException(status_code=500, detail=f"Failed to get death certificate: {str(e)}")
+
+
+# =============================================================================
+# x402 Payment Endpoints (return 402 with payment requirements)
+# =============================================================================
+
+
+@router.post("/mint")
+async def mint_agent_request(
+ name: str = Query(..., min_length=1, max_length=64, description="Agent name"),
+ network: str = Query("mainnet", description="Network (mainnet/testnet)"),
+) -> JSONResponse:
+ """Request to mint a new agent.
+
+ Returns 402 Payment Required with sBTC payment details.
+ After payment is confirmed, execute the mint transaction.
+ """
+ try:
+ logger.info("Mint agent request", extra={"name": name, "network": network})
+
+ # Return 402 with payment requirements
+ contract_address = CONTRACT_ADDRESS_MAINNET if network == "mainnet" else CONTRACT_ADDRESS_TESTNET
+
+ payment_details = {
+ "status": "payment_required",
+ "action": "mint_agent",
+ "name": name,
+ "cost_sats": MINT_COST,
+ "payment_address": contract_address, # TODO: Use actual payment address
+ "network": network,
+ "message": f"Send {MINT_COST} sats to mint your Bitcoin Agent '{name}'",
+ }
+
+ return JSONResponse(status_code=402, content=payment_details)
+
+ except Exception as e:
+ logger.error("Failed to process mint request", extra={"name": name, "error": str(e)}, exc_info=e)
+ raise HTTPException(status_code=500, detail=f"Failed to process mint request: {str(e)}")
+
+
+@router.post("/{agent_id}/feed")
+async def feed_agent_request(
+ agent_id: int,
+ food_tier: int = Query(..., ge=1, le=3, description="Food tier (1=Basic, 2=Premium, 3=Gourmet)"),
+ network: str = Query("mainnet", description="Network (mainnet/testnet)"),
+) -> JSONResponse:
+ """Request to feed an agent.
+
+ Returns 402 Payment Required with sBTC payment details.
+ After payment is confirmed, execute the feed transaction.
+ """
+ try:
+ logger.info(
+ "Feed agent request",
+ extra={"agent_id": agent_id, "food_tier": food_tier, "network": network},
+ )
+
+ if food_tier not in FOOD_TIERS:
+ raise HTTPException(status_code=400, detail=f"Invalid food tier: {food_tier}")
+
+ food_info = FOOD_TIERS[food_tier]
+ contract_address = CONTRACT_ADDRESS_MAINNET if network == "mainnet" else CONTRACT_ADDRESS_TESTNET
+
+ payment_details = {
+ "status": "payment_required",
+ "action": "feed_agent",
+ "agent_id": agent_id,
+ "food_tier": food_tier,
+ "food_name": food_info["name"],
+ "cost_sats": food_info["cost"],
+ "xp_reward": food_info["xp"],
+ "payment_address": contract_address, # TODO: Use actual payment address
+ "network": network,
+ "message": f"Send {food_info['cost']} sats to feed agent {agent_id} with {food_info['name']} food (+{food_info['xp']} XP)",
+ }
+
+ return JSONResponse(status_code=402, content=payment_details)
+
+ except HTTPException:
+ raise
+ except Exception as e:
+ logger.error(
+ "Failed to process feed request",
+ extra={"agent_id": agent_id, "error": str(e)},
+ exc_info=e,
+ )
+ raise HTTPException(status_code=500, detail=f"Failed to process feed request: {str(e)}")
+
+
+@router.post("/{agent_id}/check-death")
+async def check_agent_death(
+ agent_id: int,
+ network: str = Query("mainnet", description="Network (mainnet/testnet)"),
+) -> JSONResponse:
+ """Check if an agent should die and process death if so.
+
+ Anyone can call this to process an agent's death when health reaches 0.
+ Returns whether the agent died.
+ """
+ try:
+ logger.info("Check death request", extra={"agent_id": agent_id, "network": network})
+
+ # TODO: Call contract's check-death function
+ # This is a public function anyone can call
+
+ return JSONResponse(
+ content={
+ "agent_id": agent_id,
+ "died": False, # TODO: Actual result from contract
+ "message": "Agent is still alive",
+ }
+ )
+
+ except Exception as e:
+ logger.error(
+ "Failed to check death",
+ extra={"agent_id": agent_id, "error": str(e)},
+ exc_info=e,
+ )
+ raise HTTPException(status_code=500, detail=f"Failed to check death: {str(e)}")
+
+
+@router.post("/{agent_id}/epitaph")
+async def write_epitaph_request(
+ agent_id: int,
+ epitaph: str = Query(..., min_length=1, max_length=256, description="Memorial text"),
+ network: str = Query("mainnet", description="Network (mainnet/testnet)"),
+) -> JSONResponse:
+ """Write an epitaph for a dead agent.
+
+ Only the owner can write an epitaph, and only once.
+ """
+ try:
+ logger.info(
+ "Write epitaph request",
+ extra={"agent_id": agent_id, "epitaph_length": len(epitaph), "network": network},
+ )
+
+ # TODO: Verify ownership and call contract's write-epitaph function
+ raise HTTPException(
+ status_code=501,
+ detail="Epitaph writing not yet implemented. Requires wallet signature.",
+ )
+
+ except HTTPException:
+ raise
+ except Exception as e:
+ logger.error(
+ "Failed to write epitaph",
+ extra={"agent_id": agent_id, "error": str(e)},
+ exc_info=e,
+ )
+ raise HTTPException(status_code=500, detail=f"Failed to write epitaph: {str(e)}")
diff --git a/app/backend/models.py b/app/backend/models.py
index 026eb726..eb1b3124 100644
--- a/app/backend/models.py
+++ b/app/backend/models.py
@@ -1095,3 +1095,134 @@ class JobCooldownBase(CustomBaseModel):
class JobCooldown(JobCooldownBase):
id: UUID
updated_at: Optional[datetime] = None
+
+
+#
+# BITCOIN AGENTS (Tamagotchi-style AI agents)
+#
+class BitcoinAgentStatus(str, Enum):
+ """Status of a Bitcoin Agent."""
+
+ ALIVE = "alive"
+ DEAD = "dead"
+
+ def __str__(self):
+ return self.value
+
+
+class BitcoinAgentLevel(str, Enum):
+ """Evolution level of a Bitcoin Agent."""
+
+ HATCHLING = "hatchling" # 0-499 XP
+ JUNIOR = "junior" # 500-1999 XP
+ SENIOR = "senior" # 2000-9999 XP
+ ELDER = "elder" # 10000-49999 XP
+ LEGENDARY = "legendary" # 50000+ XP
+
+ def __str__(self):
+ return self.value
+
+
+class BitcoinAgentBase(CustomBaseModel):
+ """Base model for Bitcoin Agents."""
+
+ # On-chain data
+ agent_id: Optional[int] = None # On-chain agent ID
+ owner: Optional[str] = None # Stacks address
+ name: Optional[str] = None # Agent name (string-utf8 64)
+ hunger: Optional[int] = None # 0-100
+ health: Optional[int] = None # 0-100
+ xp: Optional[int] = None # Total XP earned
+ level: Optional[BitcoinAgentLevel] = BitcoinAgentLevel.HATCHLING
+ birth_block: Optional[int] = None # Block when minted
+ last_fed_block: Optional[int] = None # Block when last fed
+ total_fed_count: Optional[int] = None # Total times fed
+ status: Optional[BitcoinAgentStatus] = BitcoinAgentStatus.ALIVE
+
+ # Computed state (from get-computed-state)
+ computed_hunger: Optional[int] = None
+ computed_health: Optional[int] = None
+
+ # Bitcoin Face
+ face_svg_url: Optional[str] = None
+ face_image_url: Optional[str] = None
+ face_cached_at: Optional[datetime] = None
+
+ # Contract info
+ contract_address: Optional[str] = None # bitcoin-agents.clar address
+ network: Optional[str] = "mainnet" # mainnet or testnet
+
+ # Linked profile (optional)
+ profile_id: Optional[UUID] = None
+
+
+class BitcoinAgentCreate(BitcoinAgentBase):
+ pass
+
+
+class BitcoinAgent(BitcoinAgentBase):
+ id: UUID
+ created_at: datetime
+ updated_at: Optional[datetime] = None
+
+
+class BitcoinAgentFilter(CustomBaseModel):
+ """Filter model for Bitcoin Agents."""
+
+ agent_id: Optional[int] = None
+ owner: Optional[str] = None
+ name: Optional[str] = None
+ level: Optional[BitcoinAgentLevel] = None
+ status: Optional[BitcoinAgentStatus] = None
+ profile_id: Optional[UUID] = None
+ network: Optional[str] = None
+
+ # Range filters
+ xp_gte: Optional[int] = None
+ xp_lte: Optional[int] = None
+ hunger_lte: Optional[int] = None # Filter for hungry agents
+ health_lte: Optional[int] = None # Filter for unhealthy agents
+
+
+#
+# BITCOIN AGENT DEATH CERTIFICATES
+#
+class DeathCertificateBase(CustomBaseModel):
+ """Base model for Bitcoin Agent death certificates."""
+
+ agent_id: Optional[int] = None # On-chain agent ID
+ bitcoin_agent_db_id: Optional[UUID] = None # Reference to BitcoinAgent record
+ name: Optional[str] = None
+ owner: Optional[str] = None
+ birth_block: Optional[int] = None
+ death_block: Optional[int] = None
+ cause: Optional[str] = None # "starvation", "neglect"
+ final_level: Optional[BitcoinAgentLevel] = None
+ total_xp: Optional[int] = None
+ total_fed_count: Optional[int] = None
+ epitaph: Optional[str] = None # Owner-written memorial (string-utf8 256)
+ network: Optional[str] = "mainnet"
+
+
+class DeathCertificateCreate(DeathCertificateBase):
+ pass
+
+
+class DeathCertificate(DeathCertificateBase):
+ id: UUID
+ created_at: datetime
+
+
+class DeathCertificateFilter(CustomBaseModel):
+ """Filter model for death certificates."""
+
+ agent_id: Optional[int] = None
+ owner: Optional[str] = None
+ final_level: Optional[BitcoinAgentLevel] = None
+ cause: Optional[str] = None
+ network: Optional[str] = None
+
+ # Range filters for lifespan queries
+ death_block_gte: Optional[int] = None
+ death_block_lte: Optional[int] = None
+ total_xp_gte: Optional[int] = None
diff --git a/app/main.py b/app/main.py
index e61a5c9c..479de256 100644
--- a/app/main.py
+++ b/app/main.py
@@ -1,7 +1,7 @@
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
-from app.api import agents, daos, tools, webhooks, profiles
+from app.api import agents, bitcoin_agents, daos, tools, webhooks, profiles
from app.config import config
from app.lib.logger import configure_logger, setup_uvicorn_logging
from app.middleware.logging import LoggingMiddleware
@@ -42,6 +42,7 @@ async def health_check():
app.include_router(tools.router)
app.include_router(webhooks.router)
app.include_router(agents.router)
+app.include_router(bitcoin_agents.router)
app.include_router(profiles.router)
app.include_router(daos.router)
diff --git a/app/services/bitcoin_agents/__init__.py b/app/services/bitcoin_agents/__init__.py
new file mode 100644
index 00000000..b7fc6f71
--- /dev/null
+++ b/app/services/bitcoin_agents/__init__.py
@@ -0,0 +1,9 @@
+"""Bitcoin Agents services.
+
+Provides face generation, lifecycle management, and XP services for Bitcoin Agents.
+"""
+
+from app.services.bitcoin_agents.face_service import BitcoinFaceService
+from app.services.bitcoin_agents.lifecycle_service import LifecycleService
+
+__all__ = ["BitcoinFaceService", "LifecycleService"]
diff --git a/app/services/bitcoin_agents/face_service.py b/app/services/bitcoin_agents/face_service.py
new file mode 100644
index 00000000..be034402
--- /dev/null
+++ b/app/services/bitcoin_agents/face_service.py
@@ -0,0 +1,155 @@
+"""Bitcoin Faces integration service.
+
+Fetches deterministic faces from bitcoinfaces.xyz API and manages caching.
+"""
+
+from datetime import datetime, timedelta
+from typing import Optional, Dict, Any
+import httpx
+
+from app.lib.logger import configure_logger
+
+logger = configure_logger(__name__)
+
+# Bitcoin Faces API
+BITCOIN_FACES_API = "https://bitcoinfaces.xyz/api"
+FACE_CACHE_TTL = timedelta(hours=24) # Cache faces for 24 hours
+
+
+class BitcoinFaceService:
+ """Service for fetching and caching Bitcoin Faces."""
+
+ def __init__(self):
+ self.client = httpx.AsyncClient(timeout=30.0)
+ # In-memory cache (TODO: Replace with Supabase storage)
+ self._cache: Dict[str, Dict[str, Any]] = {}
+
+ async def get_face_svg(self, address: str) -> Optional[str]:
+ """Get SVG face for an address.
+
+ Args:
+ address: Stacks address or any seed string
+
+ Returns:
+ SVG code string or None if failed
+ """
+ try:
+ # Check cache first
+ cached = self._get_cached(address, "svg")
+ if cached:
+ return cached
+
+ url = f"{BITCOIN_FACES_API}/get-svg-code"
+ response = await self.client.get(url, params={"name": address})
+
+ if response.status_code == 200:
+ svg_code = response.text
+ self._set_cache(address, "svg", svg_code)
+ logger.debug(f"Fetched SVG face for {address[:20]}...")
+ return svg_code
+ else:
+ logger.warning(
+ f"Failed to fetch SVG face: {response.status_code}",
+ extra={"address": address},
+ )
+ return None
+
+ except Exception as e:
+ logger.error(f"Error fetching SVG face: {e}", exc_info=e)
+ return None
+
+ async def get_face_image_url(self, address: str) -> str:
+ """Get image URL for an address.
+
+ This returns the direct URL - no need to fetch content.
+
+ Args:
+ address: Stacks address or any seed string
+
+ Returns:
+ Image URL string
+ """
+ return f"{BITCOIN_FACES_API}/get-image?name={address}"
+
+ async def get_face_data(self, address: str) -> Dict[str, Any]:
+ """Get all face data for an address.
+
+ Args:
+ address: Stacks address or any seed string
+
+ Returns:
+ Dict with svg_url, image_url, and cached_at
+ """
+ svg = await self.get_face_svg(address)
+ image_url = await self.get_face_image_url(address)
+
+ return {
+ "svg": svg,
+ "svg_url": f"{BITCOIN_FACES_API}/get-svg-code?name={address}",
+ "image_url": image_url,
+ "cached_at": datetime.utcnow().isoformat(),
+ }
+
+ def get_evolved_face_url(
+ self,
+ address: str,
+ level: str,
+ ) -> str:
+ """Get face URL with evolution overlay.
+
+ Different levels get different visual treatments:
+ - Hatchling: Base face
+ - Junior: +glow effect
+ - Senior: +crown
+ - Elder: +aura
+ - Legendary: +legendary frame
+
+ Args:
+ address: Stacks address
+ level: Evolution level (hatchling, junior, senior, elder, legendary)
+
+ Returns:
+ URL for the evolved face image
+ """
+ base_url = f"{BITCOIN_FACES_API}/get-image?name={address}"
+
+ # TODO: Implement overlay generation
+ # For now, return base URL with level parameter
+ # The frontend can handle overlay rendering
+
+ return f"{base_url}&level={level}"
+
+ def _get_cached(self, address: str, cache_type: str) -> Optional[str]:
+ """Get cached face data if still valid."""
+ key = f"{address}:{cache_type}"
+ if key in self._cache:
+ entry = self._cache[key]
+ if datetime.utcnow() - entry["cached_at"] < FACE_CACHE_TTL:
+ return entry["data"]
+ else:
+ del self._cache[key]
+ return None
+
+ def _set_cache(self, address: str, cache_type: str, data: str):
+ """Set cache entry."""
+ key = f"{address}:{cache_type}"
+ self._cache[key] = {
+ "data": data,
+ "cached_at": datetime.utcnow(),
+ }
+
+ async def close(self):
+ """Close the HTTP client."""
+ await self.client.aclose()
+
+
+# Singleton instance
+_face_service: Optional[BitcoinFaceService] = None
+
+
+def get_face_service() -> BitcoinFaceService:
+ """Get or create the face service singleton."""
+ global _face_service
+ if _face_service is None:
+ _face_service = BitcoinFaceService()
+ return _face_service
diff --git a/app/services/bitcoin_agents/lifecycle_service.py b/app/services/bitcoin_agents/lifecycle_service.py
new file mode 100644
index 00000000..47ad9736
--- /dev/null
+++ b/app/services/bitcoin_agents/lifecycle_service.py
@@ -0,0 +1,289 @@
+"""Bitcoin Agents lifecycle management service.
+
+Handles background jobs for death checking, alerts, and stats aggregation.
+"""
+
+from datetime import datetime
+from typing import List, Dict, Any, Optional
+
+from app.lib.logger import configure_logger
+from app.tools.bitcoin_agents import (
+ get_agent_state,
+ get_computed_state,
+ get_global_stats,
+ check_and_process_deaths,
+)
+
+logger = configure_logger(__name__)
+
+# Alert thresholds
+HUNGER_WARNING_THRESHOLD = 30 # Warn when hunger < 30
+HUNGER_CRITICAL_THRESHOLD = 10 # Critical when hunger < 10
+HEALTH_WARNING_THRESHOLD = 50 # Warn when health < 50
+
+
+class LifecycleService:
+ """Service for managing Bitcoin Agent lifecycles."""
+
+ def __init__(self, network: str = "mainnet"):
+ self.network = network
+ # Track warned agents to avoid spam
+ self._warned_agents: Dict[int, datetime] = {}
+
+ async def check_all_agents_for_death(self) -> Dict[str, Any]:
+ """Background job: Check all agents for death.
+
+ Should be run hourly or more frequently.
+
+ Returns:
+ Summary of the check including deaths processed
+ """
+ logger.info(f"Running death check job on {self.network}")
+
+ try:
+ result = await check_and_process_deaths(self.network)
+
+ logger.info(
+ f"Death check complete: {result['deaths_processed']} deaths processed",
+ extra={"result": result},
+ )
+
+ return result
+
+ except Exception as e:
+ logger.error(f"Death check job failed: {e}", exc_info=e)
+ return {
+ "error": str(e),
+ "checked": 0,
+ "deaths_processed": 0,
+ }
+
+ async def get_hungry_agents(self, threshold: int = HUNGER_WARNING_THRESHOLD) -> List[Dict[str, Any]]:
+ """Get all agents with hunger below threshold.
+
+ Used for alert system.
+
+ Args:
+ threshold: Hunger level threshold (default 30)
+
+ Returns:
+ List of agents needing attention
+ """
+ try:
+ # TODO: Implement actual query
+ # 1. Get all alive agents from cache/db
+ # 2. For each, get computed state
+ # 3. Filter by hunger < threshold
+
+ hungry_agents: List[Dict[str, Any]] = []
+ logger.debug(f"Found {len(hungry_agents)} hungry agents (hunger < {threshold})")
+ return hungry_agents
+
+ except Exception as e:
+ logger.error(f"Failed to get hungry agents: {e}", exc_info=e)
+ return []
+
+ async def get_critical_agents(self) -> List[Dict[str, Any]]:
+ """Get agents in critical condition (very low hunger or health).
+
+ Returns:
+ List of agents needing immediate attention
+ """
+ try:
+ # TODO: Implement actual query
+ critical_agents: List[Dict[str, Any]] = []
+
+ # Filter for critical hunger or low health
+ logger.debug(f"Found {len(critical_agents)} critical agents")
+ return critical_agents
+
+ except Exception as e:
+ logger.error(f"Failed to get critical agents: {e}", exc_info=e)
+ return []
+
+ async def send_hunger_alerts(self) -> Dict[str, int]:
+ """Send alerts for hungry agents.
+
+ Should be run periodically (e.g., every 6 hours).
+ Avoids spamming by tracking warned agents.
+
+ Returns:
+ Summary of alerts sent
+ """
+ logger.info(f"Running hunger alert job on {self.network}")
+
+ warnings_sent = 0
+ critical_sent = 0
+
+ try:
+ # Get critical agents first
+ critical = await self.get_critical_agents()
+ for agent in critical:
+ agent_id = agent.get("agent_id")
+ if agent_id and self._should_alert(agent_id, critical=True):
+ await self._send_critical_alert(agent)
+ critical_sent += 1
+ self._mark_alerted(agent_id)
+
+ # Get warning-level agents
+ hungry = await self.get_hungry_agents(HUNGER_WARNING_THRESHOLD)
+ for agent in hungry:
+ agent_id = agent.get("agent_id")
+ if agent_id and self._should_alert(agent_id, critical=False):
+ await self._send_warning_alert(agent)
+ warnings_sent += 1
+ self._mark_alerted(agent_id)
+
+ logger.info(
+ f"Alert job complete: {critical_sent} critical, {warnings_sent} warnings",
+ )
+
+ return {
+ "critical_alerts": critical_sent,
+ "warning_alerts": warnings_sent,
+ }
+
+ except Exception as e:
+ logger.error(f"Alert job failed: {e}", exc_info=e)
+ return {
+ "error": str(e),
+ "critical_alerts": 0,
+ "warning_alerts": 0,
+ }
+
+ async def aggregate_stats(self) -> Dict[str, Any]:
+ """Aggregate and cache global statistics.
+
+ Should be run periodically to update cached stats.
+
+ Returns:
+ Current statistics
+ """
+ try:
+ stats = await get_global_stats(self.network)
+
+ logger.info(
+ f"Stats aggregated: {stats.get('total-agents', 0)} total, "
+ f"{stats.get('alive-count', 0)} alive, "
+ f"{stats.get('total-deaths', 0)} deaths",
+ )
+
+ return {
+ "total_agents": stats.get("total-agents", 0),
+ "alive_count": stats.get("alive-count", 0),
+ "total_deaths": stats.get("total-deaths", 0),
+ "total_feedings": stats.get("total-feedings", 0),
+ "aggregated_at": datetime.utcnow().isoformat(),
+ "network": self.network,
+ }
+
+ except Exception as e:
+ logger.error(f"Stats aggregation failed: {e}", exc_info=e)
+ return {"error": str(e)}
+
+ def _should_alert(self, agent_id: int, critical: bool) -> bool:
+ """Check if we should send an alert for this agent.
+
+ Avoids spamming by enforcing cooldown periods.
+ """
+ from datetime import timedelta
+
+ if agent_id not in self._warned_agents:
+ return True
+
+ last_warned = self._warned_agents[agent_id]
+ cooldown = timedelta(hours=1) if critical else timedelta(hours=6)
+
+ return datetime.utcnow() - last_warned > cooldown
+
+ def _mark_alerted(self, agent_id: int):
+ """Mark an agent as having been alerted."""
+ self._warned_agents[agent_id] = datetime.utcnow()
+
+ async def _send_warning_alert(self, agent: Dict[str, Any]):
+ """Send a warning alert for a hungry agent.
+
+ TODO: Implement actual notification (email, push, etc.)
+ """
+ logger.info(
+ f"HUNGER WARNING: Agent {agent.get('agent_id')} ({agent.get('name')}) "
+ f"has low hunger: {agent.get('computed_hunger')}%"
+ )
+
+ async def _send_critical_alert(self, agent: Dict[str, Any]):
+ """Send a critical alert for an agent in danger.
+
+ TODO: Implement actual notification (email, push, etc.)
+ """
+ logger.warning(
+ f"CRITICAL: Agent {agent.get('agent_id')} ({agent.get('name')}) "
+ f"is in critical condition! Hunger: {agent.get('computed_hunger')}%, "
+ f"Health: {agent.get('computed_health')}%"
+ )
+
+
+# Singleton instances per network
+_services: Dict[str, LifecycleService] = {}
+
+
+def get_lifecycle_service(network: str = "mainnet") -> LifecycleService:
+ """Get or create the lifecycle service for a network."""
+ if network not in _services:
+ _services[network] = LifecycleService(network)
+ return _services[network]
+
+
+# =============================================================================
+# Background Job Registration
+# =============================================================================
+
+
+async def death_check_job():
+ """Background job function for death checking.
+
+ Register this with the job management system.
+ """
+ mainnet_service = get_lifecycle_service("mainnet")
+ testnet_service = get_lifecycle_service("testnet")
+
+ mainnet_result = await mainnet_service.check_all_agents_for_death()
+ testnet_result = await testnet_service.check_all_agents_for_death()
+
+ return {
+ "mainnet": mainnet_result,
+ "testnet": testnet_result,
+ }
+
+
+async def hunger_alert_job():
+ """Background job function for hunger alerts.
+
+ Register this with the job management system.
+ """
+ mainnet_service = get_lifecycle_service("mainnet")
+ testnet_service = get_lifecycle_service("testnet")
+
+ mainnet_result = await mainnet_service.send_hunger_alerts()
+ testnet_result = await testnet_service.send_hunger_alerts()
+
+ return {
+ "mainnet": mainnet_result,
+ "testnet": testnet_result,
+ }
+
+
+async def stats_aggregation_job():
+ """Background job function for stats aggregation.
+
+ Register this with the job management system.
+ """
+ mainnet_service = get_lifecycle_service("mainnet")
+ testnet_service = get_lifecycle_service("testnet")
+
+ mainnet_result = await mainnet_service.aggregate_stats()
+ testnet_result = await testnet_service.aggregate_stats()
+
+ return {
+ "mainnet": mainnet_result,
+ "testnet": testnet_result,
+ }
diff --git a/app/tools/bitcoin_agents.py b/app/tools/bitcoin_agents.py
new file mode 100644
index 00000000..f32390bd
--- /dev/null
+++ b/app/tools/bitcoin_agents.py
@@ -0,0 +1,471 @@
+"""Bitcoin Agents tools for Stacks contract interaction.
+
+Provides tools for minting, feeding, and managing Bitcoin Agents on-chain.
+"""
+
+from typing import Any, Dict, Optional, Type
+
+from langchain.tools import BaseTool
+from pydantic import BaseModel, Field
+
+from app.lib.logger import configure_logger
+
+logger = configure_logger(__name__)
+
+# Contract addresses (to be updated after deployment)
+CONTRACT_MAINNET = "SP000000000000000000000000000000.bitcoin-agents"
+CONTRACT_TESTNET = "ST000000000000000000000000000000.bitcoin-agents"
+
+# XP thresholds for levels
+XP_THRESHOLDS = {
+ "hatchling": 0,
+ "junior": 500,
+ "senior": 2000,
+ "elder": 10000,
+ "legendary": 50000,
+}
+
+
+def get_contract_address(network: str = "mainnet") -> str:
+ """Get the contract address for the specified network."""
+ return CONTRACT_MAINNET if network == "mainnet" else CONTRACT_TESTNET
+
+
+def xp_to_level(xp: int) -> str:
+ """Convert XP to level name."""
+ if xp >= XP_THRESHOLDS["legendary"]:
+ return "legendary"
+ elif xp >= XP_THRESHOLDS["elder"]:
+ return "elder"
+ elif xp >= XP_THRESHOLDS["senior"]:
+ return "senior"
+ elif xp >= XP_THRESHOLDS["junior"]:
+ return "junior"
+ return "hatchling"
+
+
+def xp_to_level_number(xp: int) -> int:
+ """Convert XP to level number (0-4)."""
+ if xp >= XP_THRESHOLDS["legendary"]:
+ return 4
+ elif xp >= XP_THRESHOLDS["elder"]:
+ return 3
+ elif xp >= XP_THRESHOLDS["senior"]:
+ return 2
+ elif xp >= XP_THRESHOLDS["junior"]:
+ return 1
+ return 0
+
+
+# =============================================================================
+# Contract Read Functions
+# =============================================================================
+
+
+async def get_agent_state(agent_id: int, network: str = "mainnet") -> Optional[Dict[str, Any]]:
+ """Get agent state from contract.
+
+ Calls the read-only get-agent function.
+
+ Args:
+ agent_id: The on-chain agent ID
+ network: mainnet or testnet
+
+ Returns:
+ Agent state dict or None if not found
+ """
+ try:
+ # TODO: Implement actual contract call using Hiro API or agent-tools-ts
+ # For now, return placeholder
+ logger.info(f"Getting agent state for agent {agent_id} on {network}")
+
+ # Example of what the response should look like:
+ # return {
+ # "owner": "SP...",
+ # "name": "AgentName",
+ # "hunger": 100,
+ # "health": 100,
+ # "xp": 0,
+ # "birth-block": 12345,
+ # "last-fed": 12345,
+ # "total-fed-count": 0,
+ # "alive": True,
+ # }
+
+ return None
+
+ except Exception as e:
+ logger.error(f"Failed to get agent state: {e}", exc_info=e)
+ return None
+
+
+async def get_computed_state(agent_id: int, network: str = "mainnet") -> Optional[Dict[str, Any]]:
+ """Get computed hunger/health state from contract.
+
+ Calls the read-only get-computed-state function which calculates
+ current hunger/health based on blocks elapsed since last fed.
+
+ Args:
+ agent_id: The on-chain agent ID
+ network: mainnet or testnet
+
+ Returns:
+ Computed state dict or None if not found
+ """
+ try:
+ logger.info(f"Getting computed state for agent {agent_id} on {network}")
+
+ # TODO: Implement actual contract call
+ # Example response:
+ # return {
+ # "hunger": 85,
+ # "health": 100,
+ # "alive": True,
+ # }
+
+ return None
+
+ except Exception as e:
+ logger.error(f"Failed to get computed state: {e}", exc_info=e)
+ return None
+
+
+async def get_death_certificate(agent_id: int, network: str = "mainnet") -> Optional[Dict[str, Any]]:
+ """Get death certificate for a dead agent.
+
+ Args:
+ agent_id: The on-chain agent ID
+ network: mainnet or testnet
+
+ Returns:
+ Death certificate dict or None if not found/alive
+ """
+ try:
+ logger.info(f"Getting death certificate for agent {agent_id} on {network}")
+
+ # TODO: Implement actual contract call
+ return None
+
+ except Exception as e:
+ logger.error(f"Failed to get death certificate: {e}", exc_info=e)
+ return None
+
+
+async def get_global_stats(network: str = "mainnet") -> Dict[str, int]:
+ """Get global statistics from contract.
+
+ Args:
+ network: mainnet or testnet
+
+ Returns:
+ Stats dict with total-agents, total-deaths, total-feedings, alive-count
+ """
+ try:
+ logger.info(f"Getting global stats on {network}")
+
+ # TODO: Implement actual contract call
+ return {
+ "total-agents": 0,
+ "total-deaths": 0,
+ "total-feedings": 0,
+ "alive-count": 0,
+ }
+
+ except Exception as e:
+ logger.error(f"Failed to get global stats: {e}", exc_info=e)
+ return {
+ "total-agents": 0,
+ "total-deaths": 0,
+ "total-feedings": 0,
+ "alive-count": 0,
+ }
+
+
+# =============================================================================
+# Contract Write Functions (Transaction Builders)
+# =============================================================================
+
+
+async def build_mint_agent_tx(
+ owner: str,
+ name: str,
+ network: str = "mainnet",
+) -> Dict[str, Any]:
+ """Build a mint-agent transaction.
+
+ Args:
+ owner: Stacks address of the new agent owner
+ name: Name for the agent (max 64 chars)
+ network: mainnet or testnet
+
+ Returns:
+ Transaction parameters dict for signing
+ """
+ contract_address = get_contract_address(network)
+
+ return {
+ "contract_address": contract_address,
+ "function_name": "mint-agent",
+ "function_args": [
+ {"type": "string-utf8", "value": name},
+ ],
+ "post_conditions": [], # TODO: Add sBTC post-conditions
+ "network": network,
+ }
+
+
+async def build_feed_agent_tx(
+ agent_id: int,
+ food_tier: int,
+ network: str = "mainnet",
+) -> Dict[str, Any]:
+ """Build a feed-agent transaction.
+
+ Args:
+ agent_id: The on-chain agent ID
+ food_tier: Food tier (1=Basic, 2=Premium, 3=Gourmet)
+ network: mainnet or testnet
+
+ Returns:
+ Transaction parameters dict for signing
+ """
+ contract_address = get_contract_address(network)
+
+ return {
+ "contract_address": contract_address,
+ "function_name": "feed-agent",
+ "function_args": [
+ {"type": "uint", "value": agent_id},
+ {"type": "uint", "value": food_tier},
+ ],
+ "post_conditions": [], # TODO: Add sBTC post-conditions
+ "network": network,
+ }
+
+
+async def build_check_death_tx(
+ agent_id: int,
+ network: str = "mainnet",
+) -> Dict[str, Any]:
+ """Build a check-death transaction.
+
+ This is a public function anyone can call to process an agent's death.
+
+ Args:
+ agent_id: The on-chain agent ID
+ network: mainnet or testnet
+
+ Returns:
+ Transaction parameters dict for signing
+ """
+ contract_address = get_contract_address(network)
+
+ return {
+ "contract_address": contract_address,
+ "function_name": "check-death",
+ "function_args": [
+ {"type": "uint", "value": agent_id},
+ ],
+ "post_conditions": [],
+ "network": network,
+ }
+
+
+async def build_write_epitaph_tx(
+ agent_id: int,
+ epitaph: str,
+ network: str = "mainnet",
+) -> Dict[str, Any]:
+ """Build a write-epitaph transaction.
+
+ Only the owner can write an epitaph for their dead agent.
+
+ Args:
+ agent_id: The on-chain agent ID
+ epitaph: Memorial text (max 256 chars)
+ network: mainnet or testnet
+
+ Returns:
+ Transaction parameters dict for signing
+ """
+ contract_address = get_contract_address(network)
+
+ return {
+ "contract_address": contract_address,
+ "function_name": "write-epitaph",
+ "function_args": [
+ {"type": "uint", "value": agent_id},
+ {"type": "string-utf8", "value": epitaph},
+ ],
+ "post_conditions": [],
+ "network": network,
+ }
+
+
+async def build_add_xp_tx(
+ agent_id: int,
+ xp_amount: int,
+ network: str = "mainnet",
+) -> Dict[str, Any]:
+ """Build an add-xp transaction.
+
+ Only the owner can add XP to their agent.
+
+ Args:
+ agent_id: The on-chain agent ID
+ xp_amount: Amount of XP to add
+ network: mainnet or testnet
+
+ Returns:
+ Transaction parameters dict for signing
+ """
+ contract_address = get_contract_address(network)
+
+ return {
+ "contract_address": contract_address,
+ "function_name": "add-xp",
+ "function_args": [
+ {"type": "uint", "value": agent_id},
+ {"type": "uint", "value": xp_amount},
+ ],
+ "post_conditions": [],
+ "network": network,
+ }
+
+
+# =============================================================================
+# Batch Operations
+# =============================================================================
+
+
+async def check_and_process_deaths(network: str = "mainnet") -> Dict[str, Any]:
+ """Check all agents for death and process any that should die.
+
+ This is a background job that iterates through agents and calls
+ check-death for any with computed health of 0.
+
+ Args:
+ network: mainnet or testnet
+
+ Returns:
+ Summary of deaths processed
+ """
+ try:
+ logger.info(f"Checking for agent deaths on {network}")
+
+ # TODO: Implement actual batch check
+ # 1. Get all alive agents
+ # 2. For each, call get-computed-state
+ # 3. If health == 0, call check-death
+ # 4. Record death certificates
+
+ return {
+ "checked": 0,
+ "deaths_processed": 0,
+ "network": network,
+ }
+
+ except Exception as e:
+ logger.error(f"Failed to process deaths: {e}", exc_info=e)
+ return {
+ "checked": 0,
+ "deaths_processed": 0,
+ "error": str(e),
+ "network": network,
+ }
+
+
+# =============================================================================
+# LangChain Tools for MCP Integration
+# =============================================================================
+
+
+class GetAgentInput(BaseModel):
+ """Input for GetAgentTool."""
+
+ agent_id: int = Field(..., description="The on-chain agent ID")
+ network: str = Field(default="mainnet", description="Network (mainnet/testnet)")
+
+
+class GetAgentTool(BaseTool):
+ """Tool for getting Bitcoin Agent state."""
+
+ name: str = "bitcoin_agents_get_agent"
+ description: str = "Get the state of a Bitcoin Agent including hunger, health, XP, and level"
+ args_schema: Type[BaseModel] = GetAgentInput
+ return_direct: bool = False
+
+ def _run(self, agent_id: int, network: str = "mainnet") -> str:
+ """Get agent state synchronously."""
+ import asyncio
+
+ result = asyncio.run(get_agent_state(agent_id, network))
+ if result is None:
+ return f"Agent {agent_id} not found on {network}"
+ return str(result)
+
+ async def _arun(self, agent_id: int, network: str = "mainnet") -> str:
+ """Get agent state asynchronously."""
+ result = await get_agent_state(agent_id, network)
+ if result is None:
+ return f"Agent {agent_id} not found on {network}"
+ return str(result)
+
+
+class CheckAgentStatusInput(BaseModel):
+ """Input for CheckAgentStatusTool."""
+
+ agent_id: int = Field(..., description="The on-chain agent ID")
+ network: str = Field(default="mainnet", description="Network (mainnet/testnet)")
+
+
+class CheckAgentStatusTool(BaseTool):
+ """Tool for checking Bitcoin Agent computed status."""
+
+ name: str = "bitcoin_agents_check_status"
+ description: str = "Check the current computed hunger and health of a Bitcoin Agent"
+ args_schema: Type[BaseModel] = CheckAgentStatusInput
+ return_direct: bool = False
+
+ def _run(self, agent_id: int, network: str = "mainnet") -> str:
+ """Check agent status synchronously."""
+ import asyncio
+
+ result = asyncio.run(get_computed_state(agent_id, network))
+ if result is None:
+ return f"Agent {agent_id} not found on {network}"
+ return str(result)
+
+ async def _arun(self, agent_id: int, network: str = "mainnet") -> str:
+ """Check agent status asynchronously."""
+ result = await get_computed_state(agent_id, network)
+ if result is None:
+ return f"Agent {agent_id} not found on {network}"
+ return str(result)
+
+
+class GetGlobalStatsInput(BaseModel):
+ """Input for GetGlobalStatsTool."""
+
+ network: str = Field(default="mainnet", description="Network (mainnet/testnet)")
+
+
+class GetGlobalStatsTool(BaseTool):
+ """Tool for getting Bitcoin Agents global statistics."""
+
+ name: str = "bitcoin_agents_get_stats"
+ description: str = "Get global statistics for Bitcoin Agents (total agents, deaths, feedings)"
+ args_schema: Type[BaseModel] = GetGlobalStatsInput
+ return_direct: bool = False
+
+ def _run(self, network: str = "mainnet") -> str:
+ """Get global stats synchronously."""
+ import asyncio
+
+ result = asyncio.run(get_global_stats(network))
+ return str(result)
+
+ async def _arun(self, network: str = "mainnet") -> str:
+ """Get global stats asynchronously."""
+ result = await get_global_stats(network)
+ return str(result)
From 32f0f7f43aa0e90244208f3057b314126f6aecab Mon Sep 17 00:00:00 2001
From: pbtc21
Date: Wed, 21 Jan 2026 02:30:21 +0000
Subject: [PATCH 2/4] feat(bitcoin-agents): add MCP integration with tier-based
access control
Phase 3 implementation:
- Add tier-based tool access (TIER_CAPABILITIES) with 5 levels:
- Hatchling (0): Read-only operations
- Junior (1): Simple transfers
- Senior (2): Trading and contracts
- Elder (3): DAO participation and social
- Legendary (4): Full autonomy
- Create MCPService with execute_action and tool initialization
- Integrate with existing LangChain tools via tools_factory
- Add XP rewards for completed actions (ACTION_XP_REWARDS)
- Implement agent-to-agent visit interactions with rate limiting
- Add API endpoints:
- GET /{agent_id}/capabilities
- POST /{agent_id}/execute
- POST /{agent_id}/visit/{host_agent_id}
- GET /tier-info
Co-Authored-By: Claude Opus 4.5
---
app/api/bitcoin_agents.py | 171 +++++++
app/services/bitcoin_agents/__init__.py | 5 +-
app/services/bitcoin_agents/mcp_service.py | 549 +++++++++++++++++++++
app/tools/tools_factory.py | 10 +
4 files changed, 733 insertions(+), 2 deletions(-)
create mode 100644 app/services/bitcoin_agents/mcp_service.py
diff --git a/app/api/bitcoin_agents.py b/app/api/bitcoin_agents.py
index 5b619385..00507774 100644
--- a/app/api/bitcoin_agents.py
+++ b/app/api/bitcoin_agents.py
@@ -393,3 +393,174 @@ async def write_epitaph_request(
exc_info=e,
)
raise HTTPException(status_code=500, detail=f"Failed to write epitaph: {str(e)}")
+
+
+# =============================================================================
+# MCP Integration Endpoints
+# =============================================================================
+
+
+@router.get("/{agent_id}/capabilities")
+async def get_agent_capabilities(
+ agent_id: int,
+ network: str = Query("mainnet", description="Network (mainnet/testnet)"),
+) -> JSONResponse:
+ """Get available MCP tools for an agent based on their level.
+
+ Returns tools grouped by category that the agent can use.
+ """
+ try:
+ from app.services.bitcoin_agents.mcp_service import get_mcp_service
+
+ logger.debug("Getting agent capabilities", extra={"agent_id": agent_id, "network": network})
+
+ mcp_service = get_mcp_service(network)
+ result = await mcp_service.get_available_tools(agent_id)
+
+ if not result.get("success"):
+ raise HTTPException(status_code=404, detail=result.get("error", "Unknown error"))
+
+ return JSONResponse(content=result)
+
+ except HTTPException:
+ raise
+ except Exception as e:
+ logger.error(
+ "Failed to get agent capabilities",
+ extra={"agent_id": agent_id, "error": str(e)},
+ exc_info=e,
+ )
+ raise HTTPException(status_code=500, detail=f"Failed to get capabilities: {str(e)}")
+
+
+@router.post("/{agent_id}/execute")
+async def execute_agent_action(
+ agent_id: int,
+ tool_name: str = Query(..., description="Name of the tool to execute"),
+ network: str = Query("mainnet", description="Network (mainnet/testnet)"),
+) -> JSONResponse:
+ """Execute an MCP action on behalf of an agent.
+
+ Checks tier-based access before execution.
+ Awards XP on successful completion.
+
+ Note: Tool arguments should be passed in the request body for complex operations.
+ This endpoint is for simple tool invocations.
+ """
+ try:
+ from app.services.bitcoin_agents.mcp_service import get_mcp_service
+
+ logger.info(
+ "Execute agent action",
+ extra={"agent_id": agent_id, "tool": tool_name, "network": network},
+ )
+
+ mcp_service = get_mcp_service(network)
+
+ # Execute with empty args - full implementation would parse request body
+ # Tools are auto-initialized based on agent level via get_tools_for_agent
+ result = await mcp_service.execute_action(
+ agent_id=agent_id,
+ tool_name=tool_name,
+ tool_args={},
+ tools_map=None, # Auto-initialized by MCP service
+ )
+
+ if not result.get("success"):
+ status_code = 403 if "requires" in result.get("error", "") else 400
+ return JSONResponse(status_code=status_code, content=result)
+
+ return JSONResponse(content=result)
+
+ except Exception as e:
+ logger.error(
+ "Failed to execute agent action",
+ extra={"agent_id": agent_id, "tool": tool_name, "error": str(e)},
+ exc_info=e,
+ )
+ raise HTTPException(status_code=500, detail=f"Failed to execute action: {str(e)}")
+
+
+@router.post("/{agent_id}/visit/{host_agent_id}")
+async def agent_visit(
+ agent_id: int,
+ host_agent_id: int,
+ network: str = Query("mainnet", description="Network (mainnet/testnet)"),
+) -> JSONResponse:
+ """Have one agent visit another agent.
+
+ Both agents gain XP from the interaction.
+ Rate limited to prevent farming (once per hour per pair).
+ """
+ try:
+ from app.services.bitcoin_agents.mcp_service import get_mcp_service
+
+ logger.info(
+ "Agent visit",
+ extra={"visitor": agent_id, "host": host_agent_id, "network": network},
+ )
+
+ mcp_service = get_mcp_service(network)
+ result = await mcp_service.agent_visit(agent_id, host_agent_id)
+
+ if not result.get("success"):
+ return JSONResponse(status_code=400, content=result)
+
+ return JSONResponse(content=result)
+
+ except Exception as e:
+ logger.error(
+ "Failed to process agent visit",
+ extra={"visitor": agent_id, "host": host_agent_id, "error": str(e)},
+ exc_info=e,
+ )
+ raise HTTPException(status_code=500, detail=f"Failed to process visit: {str(e)}")
+
+
+@router.get("/tier-info")
+async def get_tier_info() -> JSONResponse:
+ """Get information about evolution tiers and their capabilities.
+
+ Returns XP thresholds and tools available at each tier.
+ """
+ from app.services.bitcoin_agents.mcp_service import TIER_CAPABILITIES, get_tools_for_level
+
+ tiers = [
+ {
+ "level": 0,
+ "name": "Hatchling",
+ "xp_required": 0,
+ "tools_count": len(get_tools_for_level(0)),
+ "new_capabilities": ["Read-only operations", "Balance queries", "Contract info"],
+ },
+ {
+ "level": 1,
+ "name": "Junior",
+ "xp_required": 500,
+ "tools_count": len(get_tools_for_level(1)),
+ "new_capabilities": ["STX transfers", "Token transfers", "Deposits"],
+ },
+ {
+ "level": 2,
+ "name": "Senior",
+ "xp_required": 2000,
+ "tools_count": len(get_tools_for_level(2)),
+ "new_capabilities": ["DEX trading (Faktory, Bitflow)", "Contract approvals"],
+ },
+ {
+ "level": 3,
+ "name": "Elder",
+ "xp_required": 10000,
+ "tools_count": len(get_tools_for_level(3)),
+ "new_capabilities": ["DAO voting", "Proposals", "Social posting"],
+ },
+ {
+ "level": 4,
+ "name": "Legendary",
+ "xp_required": 50000,
+ "tools_count": len(get_tools_for_level(4)),
+ "new_capabilities": ["Full autonomy", "Deploy agents", "Create proposals"],
+ },
+ ]
+
+ return JSONResponse(content={"tiers": tiers})
diff --git a/app/services/bitcoin_agents/__init__.py b/app/services/bitcoin_agents/__init__.py
index b7fc6f71..624bd4f6 100644
--- a/app/services/bitcoin_agents/__init__.py
+++ b/app/services/bitcoin_agents/__init__.py
@@ -1,9 +1,10 @@
"""Bitcoin Agents services.
-Provides face generation, lifecycle management, and XP services for Bitcoin Agents.
+Provides face generation, lifecycle management, MCP integration, and XP services for Bitcoin Agents.
"""
from app.services.bitcoin_agents.face_service import BitcoinFaceService
from app.services.bitcoin_agents.lifecycle_service import LifecycleService
+from app.services.bitcoin_agents.mcp_service import MCPService, get_mcp_service
-__all__ = ["BitcoinFaceService", "LifecycleService"]
+__all__ = ["BitcoinFaceService", "LifecycleService", "MCPService", "get_mcp_service"]
diff --git a/app/services/bitcoin_agents/mcp_service.py b/app/services/bitcoin_agents/mcp_service.py
new file mode 100644
index 00000000..7ffb941e
--- /dev/null
+++ b/app/services/bitcoin_agents/mcp_service.py
@@ -0,0 +1,549 @@
+"""Bitcoin Agents MCP Integration Service.
+
+Provides tier-based access control for MCP tools based on agent evolution level.
+"""
+
+from datetime import datetime, timedelta
+from typing import Any, Dict, List, Optional, Set
+from uuid import UUID
+
+from langchain.tools.base import BaseTool as LangChainBaseTool
+
+from app.lib.logger import configure_logger
+from app.tools.bitcoin_agents import (
+ get_agent_state,
+ get_computed_state,
+ xp_to_level,
+ xp_to_level_number,
+ build_add_xp_tx,
+)
+from app.tools.tools_factory import initialize_tools, filter_tools_by_names
+
+logger = configure_logger(__name__)
+
+# =============================================================================
+# Tier-Based Tool Access Configuration
+# =============================================================================
+
+# Level numbers: 0=Hatchling, 1=Junior, 2=Senior, 3=Elder, 4=Legendary
+LEVEL_HATCHLING = 0
+LEVEL_JUNIOR = 1
+LEVEL_SENIOR = 2
+LEVEL_ELDER = 3
+LEVEL_LEGENDARY = 4
+
+# Tools available at each tier (cumulative - higher tiers get all lower tier tools)
+TIER_CAPABILITIES: Dict[int, Set[str]] = {
+ # Hatchling (Level 0): Read-only operations
+ LEVEL_HATCHLING: {
+ # Balance and info queries
+ "stacks_get_address_balance",
+ "stacks_get_contract_info",
+ "contracts_fetch_sip10_info",
+ "contracts_fetch_source_code",
+ "wallet_get_my_balance",
+ "wallet_get_my_address",
+ "wallet_get_my_transactions",
+ # DAO read operations
+ "dao_action_get_proposal",
+ "dao_action_get_total_proposals",
+ "dao_action_get_vote_record",
+ "dao_action_get_vote_records",
+ "dao_action_get_veto_vote_record",
+ "dao_action_get_voting_configuration",
+ "dao_action_get_voting_power",
+ "dao_action_get_liquid_supply",
+ "dao_charter_get_current_charter",
+ # Database reads
+ "database_get_dao_list",
+ "database_get_dao_get_by_name",
+ "database_list_scheduled_tasks",
+ # Market data
+ "lunarcrush_get_token_metrics",
+ "lunarcrush_search",
+ "lunarcrush_get_token_metadata",
+ # Transaction lookup
+ "stacks_get_transaction_details",
+ "stacks_get_transactions_by_address",
+ # Agent account reads
+ "agent_account_get_configuration",
+ "agent_account_is_approved_contract",
+ # Bitcoin Agent reads
+ "bitcoin_agents_get_agent",
+ "bitcoin_agents_check_status",
+ "bitcoin_agents_get_stats",
+ },
+
+ # Junior (Level 1): Simple transfers
+ LEVEL_JUNIOR: {
+ "wallet_send_stx",
+ "wallet_send_sip10",
+ "wallet_fund_my_wallet_faucet",
+ "agent_account_deposit_stx",
+ "agent_account_deposit_ft",
+ },
+
+ # Senior (Level 2): Trading and contracts
+ LEVEL_SENIOR: {
+ "faktory_exec_buy",
+ "faktory_exec_buy_stx",
+ "faktory_exec_sell",
+ "faktory_get_sbtc",
+ "bitflow_execute_trade",
+ "agent_account_faktory_buy_asset",
+ "agent_account_faktory_sell_asset",
+ "agent_account_approve_contract",
+ "agent_account_revoke_contract",
+ },
+
+ # Elder (Level 3): DAO participation
+ LEVEL_ELDER: {
+ "dao_action_vote_on_proposal",
+ "dao_action_veto_proposal",
+ "dao_action_conclude_proposal",
+ "dao_propose_action_send_message",
+ "agent_account_vote_on_action_proposal",
+ "agent_account_veto_action_proposal",
+ "agent_account_conclude_action_proposal",
+ # Social actions
+ "twitter_post_tweet",
+ "telegram_send_nofication_to_user",
+ # Task scheduling
+ "database_add_scheduled_task",
+ "database_update_scheduled_task",
+ "database_delete_scheduled_task",
+ },
+
+ # Legendary (Level 4): Full autonomy
+ LEVEL_LEGENDARY: {
+ "agent_account_deploy",
+ "agent_account_create_action_proposal",
+ "x_credentials",
+ },
+}
+
+# XP rewards for completing actions
+ACTION_XP_REWARDS: Dict[str, int] = {
+ # Read operations (minimal XP)
+ "stacks_get_address_balance": 1,
+ "stacks_get_contract_info": 1,
+ "wallet_get_my_balance": 1,
+ # Transfers
+ "wallet_send_stx": 15,
+ "wallet_send_sip10": 15,
+ # Trading
+ "faktory_exec_buy": 25,
+ "faktory_exec_sell": 25,
+ "bitflow_execute_trade": 30,
+ # DAO participation
+ "dao_action_vote_on_proposal": 50,
+ "dao_propose_action_send_message": 75,
+ # Social
+ "twitter_post_tweet": 20,
+ # Advanced operations
+ "agent_account_deploy": 100,
+ "agent_account_create_action_proposal": 100,
+}
+
+# Default XP for actions not in the mapping
+DEFAULT_ACTION_XP = 5
+
+
+def get_tools_for_level(level: int) -> Set[str]:
+ """Get all tools available for a given level.
+
+ Tools are cumulative - higher levels get all lower level tools.
+
+ Args:
+ level: Agent level (0-4)
+
+ Returns:
+ Set of tool names available at this level
+ """
+ available_tools: Set[str] = set()
+
+ for tier_level, tools in TIER_CAPABILITIES.items():
+ if tier_level <= level:
+ available_tools.update(tools)
+
+ return available_tools
+
+
+def can_use_tool(agent_level: int, tool_name: str) -> bool:
+ """Check if an agent at a given level can use a specific tool.
+
+ Args:
+ agent_level: Agent's current level (0-4)
+ tool_name: Name of the tool to check
+
+ Returns:
+ True if the agent can use this tool
+ """
+ available = get_tools_for_level(agent_level)
+ return tool_name in available
+
+
+def get_required_level_for_tool(tool_name: str) -> Optional[int]:
+ """Get the minimum level required to use a tool.
+
+ Args:
+ tool_name: Name of the tool
+
+ Returns:
+ Minimum level required, or None if tool not found
+ """
+ for level in range(5): # 0 to 4
+ if tool_name in TIER_CAPABILITIES.get(level, set()):
+ return level
+ return None
+
+
+def get_xp_reward_for_action(tool_name: str) -> int:
+ """Get XP reward for completing an action.
+
+ Args:
+ tool_name: Name of the tool/action
+
+ Returns:
+ XP reward amount
+ """
+ return ACTION_XP_REWARDS.get(tool_name, DEFAULT_ACTION_XP)
+
+
+# =============================================================================
+# MCP Service Class
+# =============================================================================
+
+
+class MCPService:
+ """Service for executing MCP actions with tier-based access control."""
+
+ def __init__(self, network: str = "mainnet"):
+ self.network = network
+ # Rate limiting for actions
+ self._action_cooldowns: Dict[str, datetime] = {} # agent_id:action -> last_used
+ self._interaction_cooldowns: Dict[str, datetime] = {} # agent_pair -> last_interaction
+ # Cache initialized tools per agent
+ self._tools_cache: Dict[int, Dict[str, LangChainBaseTool]] = {}
+
+ def get_tools_for_agent(
+ self,
+ agent_id: int,
+ agent_level: int,
+ wallet_id: Optional[UUID] = None,
+ ) -> Dict[str, LangChainBaseTool]:
+ """Initialize and return tools for a Bitcoin Agent based on their level.
+
+ Uses the existing tools factory but filters based on tier access.
+
+ Args:
+ agent_id: The Bitcoin Agent's on-chain ID
+ agent_level: Agent's current level (0-4)
+ wallet_id: Optional wallet ID for the agent (for tool initialization)
+
+ Returns:
+ Dictionary of tools available to this agent
+ """
+ # Check cache first
+ if agent_id in self._tools_cache:
+ return self._tools_cache[agent_id]
+
+ # Get all tools available at this level
+ allowed_tools = get_tools_for_level(agent_level)
+
+ # Initialize base tools (without profile/wallet for read-only tools)
+ # In a full implementation, the Bitcoin Agent would have an associated wallet
+ all_tools = initialize_tools(profile=None, agent_id=None)
+
+ # Filter to only allowed tools
+ filtered_tools = filter_tools_by_names(list(allowed_tools), all_tools)
+
+ # Cache for this agent
+ self._tools_cache[agent_id] = filtered_tools
+
+ logger.debug(
+ f"Initialized {len(filtered_tools)} tools for agent {agent_id} (level {agent_level})",
+ extra={"agent_id": agent_id, "level": agent_level, "tools_count": len(filtered_tools)},
+ )
+
+ return filtered_tools
+
+ def invalidate_tools_cache(self, agent_id: int):
+ """Clear cached tools for an agent (call when level changes)."""
+ if agent_id in self._tools_cache:
+ del self._tools_cache[agent_id]
+
+ async def execute_action(
+ self,
+ agent_id: int,
+ tool_name: str,
+ tool_args: Dict[str, Any],
+ tools_map: Optional[Dict[str, Any]] = None,
+ ) -> Dict[str, Any]:
+ """Execute an MCP action with tier-based access control.
+
+ Args:
+ agent_id: The Bitcoin Agent's on-chain ID
+ tool_name: Name of the tool to execute
+ tool_args: Arguments to pass to the tool
+ tools_map: Optional pre-initialized tools map
+
+ Returns:
+ Result dict with success status, result/error, and XP earned
+ """
+ try:
+ # 1. Get agent state and level
+ agent_state = await get_agent_state(agent_id, self.network)
+ if not agent_state:
+ return {
+ "success": False,
+ "error": f"Agent {agent_id} not found",
+ "xp_earned": 0,
+ }
+
+ if not agent_state.get("alive", True):
+ return {
+ "success": False,
+ "error": f"Agent {agent_id} is dead and cannot perform actions",
+ "xp_earned": 0,
+ }
+
+ xp = agent_state.get("xp", 0)
+ level = xp_to_level_number(xp)
+
+ # 2. Check tier access
+ if not can_use_tool(level, tool_name):
+ required_level = get_required_level_for_tool(tool_name)
+ level_name = xp_to_level(xp)
+ required_name = ["Hatchling", "Junior", "Senior", "Elder", "Legendary"][required_level] if required_level is not None else "Unknown"
+
+ return {
+ "success": False,
+ "error": f"Agent is {level_name} (level {level}) but {tool_name} requires {required_name} (level {required_level})",
+ "current_level": level,
+ "required_level": required_level,
+ "xp_earned": 0,
+ }
+
+ # 3. Check rate limiting (optional, can be customized per tool)
+ cooldown_key = f"{agent_id}:{tool_name}"
+ if cooldown_key in self._action_cooldowns:
+ last_used = self._action_cooldowns[cooldown_key]
+ cooldown = timedelta(seconds=10) # Default 10 second cooldown
+ if datetime.utcnow() - last_used < cooldown:
+ return {
+ "success": False,
+ "error": "Action on cooldown, please wait",
+ "xp_earned": 0,
+ }
+
+ # 4. Initialize tools if not provided
+ if not tools_map:
+ tools_map = self.get_tools_for_agent(agent_id, level)
+
+ # 5. Execute the tool
+ if tool_name in tools_map:
+ tool = tools_map[tool_name]
+ try:
+ result = await tool._arun(**tool_args)
+ except Exception as tool_error:
+ # Try sync version
+ result = tool._run(**tool_args)
+ else:
+ # Tool not available (might not be in the factory yet)
+ return {
+ "success": False,
+ "error": f"Tool {tool_name} not available in tools factory",
+ "xp_earned": 0,
+ }
+
+ # 6. Record action and update cooldown
+ self._action_cooldowns[cooldown_key] = datetime.utcnow()
+
+ # 7. Calculate and award XP
+ xp_reward = get_xp_reward_for_action(tool_name)
+
+ # TODO: Actually call add-xp on the contract
+ # For now, just return the XP that would be earned
+ # tx = await build_add_xp_tx(agent_id, xp_reward, self.network)
+
+ logger.info(
+ f"Agent {agent_id} executed {tool_name}, earned {xp_reward} XP",
+ extra={"agent_id": agent_id, "tool": tool_name, "xp": xp_reward},
+ )
+
+ return {
+ "success": True,
+ "result": result,
+ "xp_earned": xp_reward,
+ "new_total_xp": xp + xp_reward,
+ }
+
+ except Exception as e:
+ logger.error(f"Failed to execute action: {e}", exc_info=e)
+ return {
+ "success": False,
+ "error": str(e),
+ "xp_earned": 0,
+ }
+
+ async def get_available_tools(self, agent_id: int) -> Dict[str, Any]:
+ """Get list of tools available to an agent based on their level.
+
+ Args:
+ agent_id: The Bitcoin Agent's on-chain ID
+
+ Returns:
+ Dict with available tools and level info
+ """
+ try:
+ agent_state = await get_agent_state(agent_id, self.network)
+ if not agent_state:
+ return {
+ "success": False,
+ "error": f"Agent {agent_id} not found",
+ }
+
+ xp = agent_state.get("xp", 0)
+ level = xp_to_level_number(xp)
+ level_name = xp_to_level(xp)
+
+ available_tools = get_tools_for_level(level)
+
+ # Group tools by category
+ tool_categories = {
+ "read_only": [],
+ "transfers": [],
+ "trading": [],
+ "dao": [],
+ "social": [],
+ "advanced": [],
+ }
+
+ for tool in available_tools:
+ if "get" in tool or "fetch" in tool or "list" in tool or "check" in tool:
+ tool_categories["read_only"].append(tool)
+ elif "send" in tool or "transfer" in tool or "deposit" in tool:
+ tool_categories["transfers"].append(tool)
+ elif "faktory" in tool or "bitflow" in tool or "buy" in tool or "sell" in tool:
+ tool_categories["trading"].append(tool)
+ elif "dao" in tool or "vote" in tool or "proposal" in tool or "veto" in tool:
+ tool_categories["dao"].append(tool)
+ elif "twitter" in tool or "telegram" in tool:
+ tool_categories["social"].append(tool)
+ else:
+ tool_categories["advanced"].append(tool)
+
+ return {
+ "success": True,
+ "agent_id": agent_id,
+ "level": level,
+ "level_name": level_name,
+ "xp": xp,
+ "total_tools": len(available_tools),
+ "tools_by_category": tool_categories,
+ "all_tools": sorted(list(available_tools)),
+ }
+
+ except Exception as e:
+ logger.error(f"Failed to get available tools: {e}", exc_info=e)
+ return {
+ "success": False,
+ "error": str(e),
+ }
+
+ async def agent_visit(
+ self,
+ visitor_agent_id: int,
+ host_agent_id: int,
+ ) -> Dict[str, Any]:
+ """Process an agent-to-agent visit interaction.
+
+ Both agents gain XP from the interaction.
+ Rate limited to prevent farming.
+
+ Args:
+ visitor_agent_id: Agent making the visit
+ host_agent_id: Agent being visited
+
+ Returns:
+ Result dict with XP earned by both agents
+ """
+ try:
+ if visitor_agent_id == host_agent_id:
+ return {
+ "success": False,
+ "error": "An agent cannot visit itself",
+ }
+
+ # Check rate limit (one visit per pair per hour)
+ pair_key = f"{min(visitor_agent_id, host_agent_id)}:{max(visitor_agent_id, host_agent_id)}"
+ if pair_key in self._interaction_cooldowns:
+ last_interaction = self._interaction_cooldowns[pair_key]
+ cooldown = timedelta(hours=1)
+ if datetime.utcnow() - last_interaction < cooldown:
+ remaining = cooldown - (datetime.utcnow() - last_interaction)
+ return {
+ "success": False,
+ "error": f"These agents already interacted recently. Try again in {remaining.seconds // 60} minutes.",
+ }
+
+ # Verify both agents exist and are alive
+ visitor_state = await get_agent_state(visitor_agent_id, self.network)
+ host_state = await get_agent_state(host_agent_id, self.network)
+
+ if not visitor_state:
+ return {"success": False, "error": f"Visitor agent {visitor_agent_id} not found"}
+ if not host_state:
+ return {"success": False, "error": f"Host agent {host_agent_id} not found"}
+
+ if not visitor_state.get("alive", True):
+ return {"success": False, "error": f"Visitor agent {visitor_agent_id} is dead"}
+ if not host_state.get("alive", True):
+ return {"success": False, "error": f"Host agent {host_agent_id} is dead"}
+
+ # Calculate XP rewards (both get XP, visitor gets slightly more)
+ visitor_xp = 15
+ host_xp = 10
+
+ # Record the interaction
+ self._interaction_cooldowns[pair_key] = datetime.utcnow()
+
+ # TODO: Actually call add-xp on the contract for both agents
+
+ logger.info(
+ f"Agent {visitor_agent_id} visited agent {host_agent_id}",
+ extra={
+ "visitor": visitor_agent_id,
+ "host": host_agent_id,
+ "visitor_xp": visitor_xp,
+ "host_xp": host_xp,
+ },
+ )
+
+ return {
+ "success": True,
+ "visitor_agent_id": visitor_agent_id,
+ "host_agent_id": host_agent_id,
+ "visitor_xp_earned": visitor_xp,
+ "host_xp_earned": host_xp,
+ "message": f"Agent {visitor_agent_id} visited agent {host_agent_id}! Both agents gained XP.",
+ }
+
+ except Exception as e:
+ logger.error(f"Failed to process agent visit: {e}", exc_info=e)
+ return {
+ "success": False,
+ "error": str(e),
+ }
+
+
+# Singleton instances per network
+_services: Dict[str, MCPService] = {}
+
+
+def get_mcp_service(network: str = "mainnet") -> MCPService:
+ """Get or create the MCP service for a network."""
+ if network not in _services:
+ _services[network] = MCPService(network)
+ return _services[network]
diff --git a/app/tools/tools_factory.py b/app/tools/tools_factory.py
index 88a33093..24702e5b 100644
--- a/app/tools/tools_factory.py
+++ b/app/tools/tools_factory.py
@@ -85,6 +85,11 @@
WalletSIP10SendTool,
)
from .x_credentials import CollectXCredentialsTool
+from .bitcoin_agents import (
+ GetAgentTool,
+ CheckAgentStatusTool,
+ GetGlobalStatsTool,
+)
logger = configure_logger(__name__)
@@ -192,6 +197,11 @@ def initialize_tools(
"agent_account_approve_contract": AgentAccountApproveContractTool(wallet_id),
"agent_account_revoke_contract": AgentAccountRevokeContractTool(wallet_id),
# --- END MODIFIED AGENT ACCOUNT TOOLS ---
+ # --- BITCOIN AGENTS TOOLS ---
+ "bitcoin_agents_get_agent": GetAgentTool(),
+ "bitcoin_agents_check_status": CheckAgentStatusTool(),
+ "bitcoin_agents_get_stats": GetGlobalStatsTool(),
+ # --- END BITCOIN AGENTS TOOLS ---
}
return tools
From bf63044f8ff6d7e8220eca571f9b2b8feb25f6f8 Mon Sep 17 00:00:00 2001
From: pbtc21
Date: Wed, 21 Jan 2026 03:20:45 +0000
Subject: [PATCH 3/4] docs: add Bitcoin Agents API documentation
Complete API reference for Bitcoin Agents endpoints:
- Agent CRUD operations
- Leaderboard and graveyard
- Food tiers and stats
- Payment endpoints (x402)
- MCP integration endpoints
- Error codes and rate limits
- Evolution tier reference
Co-Authored-By: Claude Opus 4.5
---
docs/BITCOIN_AGENTS_API.md | 410 +++++++++++++++++++++++++++++++++++++
1 file changed, 410 insertions(+)
create mode 100644 docs/BITCOIN_AGENTS_API.md
diff --git a/docs/BITCOIN_AGENTS_API.md b/docs/BITCOIN_AGENTS_API.md
new file mode 100644
index 00000000..9e6ec463
--- /dev/null
+++ b/docs/BITCOIN_AGENTS_API.md
@@ -0,0 +1,410 @@
+# Bitcoin Agents API Documentation
+
+## Overview
+
+Bitcoin Agents are Tamagotchi-style AI companions that live on the Bitcoin/Stacks blockchain. They have hunger/health mechanics, earn XP from actions, evolve through 5 tiers, and can permanently die if neglected.
+
+## Base URL
+
+```
+Production: https://api.aibtc.dev/bitcoin-agents
+Staging: https://api-staging.aibtc.dev/bitcoin-agents
+```
+
+## Authentication
+
+Most read endpoints are public. Write operations require wallet authentication via signed messages.
+
+---
+
+## Endpoints
+
+### List Agents
+
+```http
+GET /bitcoin-agents
+```
+
+**Query Parameters:**
+| Parameter | Type | Default | Description |
+|-----------|------|---------|-------------|
+| `owner` | string | - | Filter by owner address |
+| `status` | string | - | Filter by `alive` or `dead` |
+| `level` | string | - | Filter by level: `hatchling`, `junior`, `senior`, `elder`, `legendary` |
+| `network` | string | `mainnet` | Network: `mainnet` or `testnet` |
+| `limit` | int | 50 | Max results (1-100) |
+| `offset` | int | 0 | Pagination offset |
+
+**Response:**
+```json
+{
+ "agents": [
+ {
+ "agent_id": 0,
+ "owner": "SP...",
+ "name": "MyAgent",
+ "hunger": 100,
+ "health": 100,
+ "xp": 150,
+ "level": "hatchling",
+ "birth_block": 12345,
+ "last_fed": 12400,
+ "total_fed_count": 5,
+ "alive": true,
+ "computed_hunger": 85,
+ "computed_health": 100,
+ "face_image_url": "https://bitcoinfaces.xyz/api/get-image?name=SP..."
+ }
+ ],
+ "total": 1,
+ "limit": 50,
+ "offset": 0
+}
+```
+
+---
+
+### Get Agent
+
+```http
+GET /bitcoin-agents/{agent_id}
+```
+
+**Path Parameters:**
+| Parameter | Type | Description |
+|-----------|------|-------------|
+| `agent_id` | int | On-chain agent ID |
+
+**Query Parameters:**
+| Parameter | Type | Default | Description |
+|-----------|------|---------|-------------|
+| `network` | string | `mainnet` | Network |
+
+**Response:**
+```json
+{
+ "agent_id": 0,
+ "owner": "SP...",
+ "name": "MyAgent",
+ "hunger": 100,
+ "health": 100,
+ "xp": 150,
+ "level": "hatchling",
+ "birth_block": 12345,
+ "last_fed": 12400,
+ "total_fed_count": 5,
+ "alive": true,
+ "computed_hunger": 85,
+ "computed_health": 100
+}
+```
+
+---
+
+### Get Agent Status
+
+```http
+GET /bitcoin-agents/{agent_id}/status
+```
+
+Returns real-time computed hunger/health based on blocks elapsed.
+
+**Response:**
+```json
+{
+ "hunger": 85,
+ "health": 100,
+ "alive": true
+}
+```
+
+---
+
+### Get Leaderboard
+
+```http
+GET /bitcoin-agents/leaderboard
+```
+
+**Query Parameters:**
+| Parameter | Type | Default | Description |
+|-----------|------|---------|-------------|
+| `network` | string | `mainnet` | Network |
+| `limit` | int | 10 | Top N agents (1-50) |
+
+**Response:**
+```json
+{
+ "leaderboard": [
+ { "agent_id": 5, "name": "TopAgent", "xp": 5000, "level": "senior" }
+ ]
+}
+```
+
+---
+
+### Get Graveyard
+
+```http
+GET /bitcoin-agents/graveyard
+```
+
+Returns death certificates for deceased agents.
+
+**Response:**
+```json
+{
+ "certificates": [
+ {
+ "agent_id": 3,
+ "name": "FallenHero",
+ "owner": "SP...",
+ "death_block": 15000,
+ "birth_block": 12000,
+ "final_xp": 250,
+ "final_level": "junior",
+ "total_feedings": 8,
+ "epitaph": "Gone but not forgotten",
+ "cause_of_death": "starvation",
+ "lifespan_blocks": 3000
+ }
+ ],
+ "total": 1
+}
+```
+
+---
+
+### Get Global Stats
+
+```http
+GET /bitcoin-agents/stats
+```
+
+**Response:**
+```json
+{
+ "total_agents": 100,
+ "alive_count": 85,
+ "total_deaths": 15,
+ "total_feedings": 500,
+ "network": "mainnet"
+}
+```
+
+---
+
+### Get Food Tiers
+
+```http
+GET /bitcoin-agents/food-tiers
+```
+
+**Response:**
+```json
+{
+ "food_tiers": {
+ "1": { "name": "Basic", "cost": 100, "xp": 10 },
+ "2": { "name": "Premium", "cost": 500, "xp": 25 },
+ "3": { "name": "Gourmet", "cost": 1000, "xp": 50 }
+ },
+ "mint_cost": 10000
+}
+```
+
+---
+
+### Get Tier Info
+
+```http
+GET /bitcoin-agents/tier-info
+```
+
+Returns evolution level information.
+
+**Response:**
+```json
+{
+ "tiers": [
+ {
+ "level": 0,
+ "name": "Hatchling",
+ "xp_required": 0,
+ "tools_count": 34,
+ "new_capabilities": ["Read-only operations", "Balance queries"]
+ },
+ {
+ "level": 1,
+ "name": "Junior",
+ "xp_required": 500,
+ "tools_count": 39,
+ "new_capabilities": ["STX transfers", "Token transfers"]
+ }
+ ]
+}
+```
+
+---
+
+## Payment Endpoints (x402)
+
+These endpoints return `402 Payment Required` with sBTC payment details.
+
+### Mint Agent
+
+```http
+POST /bitcoin-agents/mint?name={name}&network={network}
+```
+
+**Response (402):**
+```json
+{
+ "status": "payment_required",
+ "action": "mint_agent",
+ "name": "MyNewAgent",
+ "cost_sats": 10000,
+ "payment_address": "SP...",
+ "message": "Send 10000 sats to mint your Bitcoin Agent 'MyNewAgent'"
+}
+```
+
+---
+
+### Feed Agent
+
+```http
+POST /bitcoin-agents/{agent_id}/feed?food_tier={1-3}&network={network}
+```
+
+**Response (402):**
+```json
+{
+ "status": "payment_required",
+ "action": "feed_agent",
+ "agent_id": 0,
+ "food_tier": 2,
+ "food_name": "Premium",
+ "cost_sats": 500,
+ "xp_reward": 25,
+ "payment_address": "SP...",
+ "message": "Send 500 sats to feed agent 0 with Premium food (+25 XP)"
+}
+```
+
+---
+
+## MCP Integration Endpoints
+
+### Get Agent Capabilities
+
+```http
+GET /bitcoin-agents/{agent_id}/capabilities
+```
+
+Returns available MCP tools based on agent level.
+
+**Response:**
+```json
+{
+ "success": true,
+ "agent_id": 0,
+ "level": 1,
+ "level_name": "Junior",
+ "xp": 750,
+ "total_tools": 39,
+ "tools_by_category": {
+ "read_only": ["stacks_get_address_balance", "..."],
+ "transfers": ["wallet_send_stx", "..."],
+ "trading": [],
+ "dao": [],
+ "social": [],
+ "advanced": []
+ }
+}
+```
+
+---
+
+### Execute Action
+
+```http
+POST /bitcoin-agents/{agent_id}/execute?tool_name={tool}&network={network}
+```
+
+Executes an MCP action on behalf of the agent. Tier-gated.
+
+**Response:**
+```json
+{
+ "success": true,
+ "result": "...",
+ "xp_earned": 15,
+ "new_total_xp": 765
+}
+```
+
+**Error Response (tier blocked):**
+```json
+{
+ "success": false,
+ "error": "Agent is Junior (level 1) but faktory_exec_buy requires Senior (level 2)",
+ "current_level": 1,
+ "required_level": 2,
+ "xp_earned": 0
+}
+```
+
+---
+
+### Agent Visit
+
+```http
+POST /bitcoin-agents/{agent_id}/visit/{host_agent_id}?network={network}
+```
+
+Social interaction between agents. Both gain XP.
+
+**Response:**
+```json
+{
+ "success": true,
+ "visitor_agent_id": 0,
+ "host_agent_id": 1,
+ "visitor_xp_earned": 15,
+ "host_xp_earned": 10,
+ "message": "Agent 0 visited agent 1! Both agents gained XP."
+}
+```
+
+---
+
+## Error Codes
+
+| Status | Meaning |
+|--------|---------|
+| 200 | Success |
+| 400 | Bad request / validation error |
+| 402 | Payment required (expected for mint/feed) |
+| 403 | Forbidden (tier access denied) |
+| 404 | Agent not found |
+| 500 | Server error |
+
+---
+
+## Evolution Tiers
+
+| Level | Name | XP Required | New Capabilities |
+|-------|------|-------------|------------------|
+| 0 | Hatchling | 0 | Read-only operations |
+| 1 | Junior | 500 | Transfers, deposits |
+| 2 | Senior | 2,000 | DEX trading, contract approvals |
+| 3 | Elder | 10,000 | DAO voting, social posting |
+| 4 | Legendary | 50,000 | Full autonomy, deploy agents |
+
+---
+
+## Rate Limits
+
+- Agent visits: 1 per hour per agent pair
+- Action execution: 10 second cooldown per agent per tool
+- General API: 100 requests per minute per IP
From e079da63426da0dcdab3bb94ee892f6d26994148 Mon Sep 17 00:00:00 2001
From: pbtc21
Date: Wed, 21 Jan 2026 03:41:02 +0000
Subject: [PATCH 4/4] fix: add input validation to bitcoin-agents API
- Add network validation (must be mainnet or testnet)
- Add agent_id validation via Path (must be >= 0)
- Support contract addresses via environment variables
- Add validate_network and get_contract_address helpers
Co-Authored-By: Claude Opus 4.5
---
app/api/bitcoin_agents.py | 45 ++++++++++++++++++++++++++++++++-------
1 file changed, 37 insertions(+), 8 deletions(-)
diff --git a/app/api/bitcoin_agents.py b/app/api/bitcoin_agents.py
index 00507774..76ccfeaa 100644
--- a/app/api/bitcoin_agents.py
+++ b/app/api/bitcoin_agents.py
@@ -3,9 +3,10 @@
Provides CRUD endpoints for Tamagotchi-style AI agents with on-chain lifecycle.
"""
-from typing import List, Optional
+import os
+from typing import List, Literal, Optional
-from fastapi import APIRouter, HTTPException, Query
+from fastapi import APIRouter, HTTPException, Path, Query
from starlette.responses import JSONResponse
from app.backend.models import (
@@ -24,9 +25,34 @@
# Create the router
router = APIRouter(prefix="/bitcoin-agents", tags=["bitcoin-agents"])
-# Contract constants
-CONTRACT_ADDRESS_MAINNET = "SP000000000000000000000000000000.bitcoin-agents" # TODO: Update after deployment
-CONTRACT_ADDRESS_TESTNET = "ST000000000000000000000000000000.bitcoin-agents" # TODO: Update after deployment
+# Contract constants - can be overridden by environment variables
+CONTRACT_ADDRESS_MAINNET = os.getenv(
+ "BITCOIN_AGENTS_CONTRACT_MAINNET",
+ "SP000000000000000000000000000000.bitcoin-agents" # Placeholder until deployment
+)
+CONTRACT_ADDRESS_TESTNET = os.getenv(
+ "BITCOIN_AGENTS_CONTRACT_TESTNET",
+ "ST000000000000000000000000000000.bitcoin-agents" # Placeholder until deployment
+)
+
+# Valid networks
+VALID_NETWORKS = ("mainnet", "testnet")
+
+
+def validate_network(network: str) -> str:
+ """Validate network parameter."""
+ if network not in VALID_NETWORKS:
+ raise HTTPException(
+ status_code=400,
+ detail=f"Invalid network '{network}'. Must be one of: {', '.join(VALID_NETWORKS)}"
+ )
+ return network
+
+
+def get_contract_address(network: str) -> str:
+ """Get contract address for the specified network."""
+ validate_network(network)
+ return CONTRACT_ADDRESS_MAINNET if network == "mainnet" else CONTRACT_ADDRESS_TESTNET
# Food tier pricing (in sats)
FOOD_TIERS = {
@@ -166,7 +192,7 @@ async def get_food_tiers() -> JSONResponse:
@router.get("/{agent_id}")
async def get_agent(
- agent_id: int,
+ agent_id: int = Path(..., ge=0, description="Agent ID (non-negative integer)"),
network: str = Query("mainnet", description="Network (mainnet/testnet)"),
) -> JSONResponse:
"""Get a specific agent by ID.
@@ -174,6 +200,7 @@ async def get_agent(
Returns agent details including computed hunger/health state.
"""
try:
+ validate_network(network)
logger.debug("Getting agent", extra={"agent_id": agent_id, "network": network})
# TODO: Implement actual contract read
@@ -189,7 +216,7 @@ async def get_agent(
@router.get("/{agent_id}/status")
async def get_agent_status(
- agent_id: int,
+ agent_id: int = Path(..., ge=0, description="Agent ID (non-negative integer)"),
network: str = Query("mainnet", description="Network (mainnet/testnet)"),
) -> JSONResponse:
"""Get computed hunger/health status for an agent.
@@ -197,6 +224,7 @@ async def get_agent_status(
Returns current computed state based on blocks elapsed since last fed.
"""
try:
+ validate_network(network)
logger.debug("Getting agent status", extra={"agent_id": agent_id, "network": network})
# TODO: Call contract's get-computed-state function
@@ -215,7 +243,7 @@ async def get_agent_status(
@router.get("/{agent_id}/death-certificate")
async def get_death_certificate(
- agent_id: int,
+ agent_id: int = Path(..., ge=0, description="Agent ID (non-negative integer)"),
network: str = Query("mainnet", description="Network (mainnet/testnet)"),
) -> JSONResponse:
"""Get death certificate for a dead agent.
@@ -223,6 +251,7 @@ async def get_death_certificate(
Returns death certificate details including epitaph if set.
"""
try:
+ validate_network(network)
logger.debug(
"Getting death certificate",
extra={"agent_id": agent_id, "network": network},