diff --git a/app/api/bitcoin_agents.py b/app/api/bitcoin_agents.py new file mode 100644 index 00000000..76ccfeaa --- /dev/null +++ b/app/api/bitcoin_agents.py @@ -0,0 +1,595 @@ +"""Bitcoin Agents API router. + +Provides CRUD endpoints for Tamagotchi-style AI agents with on-chain lifecycle. +""" + +import os +from typing import List, Literal, Optional + +from fastapi import APIRouter, HTTPException, Path, Query +from starlette.responses import JSONResponse + +from app.backend.models import ( + BitcoinAgent, + BitcoinAgentFilter, + BitcoinAgentLevel, + BitcoinAgentStatus, + DeathCertificate, + DeathCertificateFilter, +) +from app.lib.logger import configure_logger + +# Configure logger +logger = configure_logger(__name__) + +# Create the router +router = APIRouter(prefix="/bitcoin-agents", tags=["bitcoin-agents"]) + +# Contract constants - can be overridden by environment variables +CONTRACT_ADDRESS_MAINNET = os.getenv( + "BITCOIN_AGENTS_CONTRACT_MAINNET", + "SP000000000000000000000000000000.bitcoin-agents" # Placeholder until deployment +) +CONTRACT_ADDRESS_TESTNET = os.getenv( + "BITCOIN_AGENTS_CONTRACT_TESTNET", + "ST000000000000000000000000000000.bitcoin-agents" # Placeholder until deployment +) + +# Valid networks +VALID_NETWORKS = ("mainnet", "testnet") + + +def validate_network(network: str) -> str: + """Validate network parameter.""" + if network not in VALID_NETWORKS: + raise HTTPException( + status_code=400, + detail=f"Invalid network '{network}'. Must be one of: {', '.join(VALID_NETWORKS)}" + ) + return network + + +def get_contract_address(network: str) -> str: + """Get contract address for the specified network.""" + validate_network(network) + return CONTRACT_ADDRESS_MAINNET if network == "mainnet" else CONTRACT_ADDRESS_TESTNET + +# Food tier pricing (in sats) +FOOD_TIERS = { + 1: {"name": "Basic", "cost": 100, "xp": 10}, + 2: {"name": "Premium", "cost": 500, "xp": 25}, + 3: {"name": "Gourmet", "cost": 1000, "xp": 50}, +} + +# Mint cost (in sats) +MINT_COST = 10000 + + +@router.get("") +async def list_agents( + owner: Optional[str] = Query(None, description="Filter by owner address"), + status: Optional[BitcoinAgentStatus] = Query(None, description="Filter by status (alive/dead)"), + level: Optional[BitcoinAgentLevel] = Query(None, description="Filter by evolution level"), + network: str = Query("mainnet", description="Network (mainnet/testnet)"), + limit: int = Query(50, ge=1, le=100, description="Max results to return"), + offset: int = Query(0, ge=0, description="Offset for pagination"), +) -> JSONResponse: + """List all Bitcoin Agents with optional filters. + + Returns agents sorted by XP (descending). + """ + try: + logger.debug( + "Listing bitcoin agents", + extra={"owner": owner, "status": status, "level": level, "network": network}, + ) + + # TODO: Implement actual contract reads and database caching + # For now, return empty list as placeholder + agents: List[dict] = [] + + return JSONResponse( + content={ + "agents": agents, + "total": len(agents), + "limit": limit, + "offset": offset, + } + ) + + except Exception as e: + logger.error("Failed to list bitcoin agents", extra={"error": str(e)}, exc_info=e) + raise HTTPException(status_code=500, detail=f"Failed to list agents: {str(e)}") + + +@router.get("/leaderboard") +async def get_leaderboard( + network: str = Query("mainnet", description="Network (mainnet/testnet)"), + limit: int = Query(10, ge=1, le=50, description="Top N agents"), +) -> JSONResponse: + """Get top agents by XP. + + Returns the leaderboard of highest XP agents. + """ + try: + logger.debug("Getting leaderboard", extra={"network": network, "limit": limit}) + + # TODO: Implement actual leaderboard query + leaderboard: List[dict] = [] + + return JSONResponse(content={"leaderboard": leaderboard, "network": network}) + + except Exception as e: + logger.error("Failed to get leaderboard", extra={"error": str(e)}, exc_info=e) + raise HTTPException(status_code=500, detail=f"Failed to get leaderboard: {str(e)}") + + +@router.get("/graveyard") +async def get_graveyard( + network: str = Query("mainnet", description="Network (mainnet/testnet)"), + limit: int = Query(50, ge=1, le=100, description="Max results"), + offset: int = Query(0, ge=0, description="Offset for pagination"), +) -> JSONResponse: + """Get all dead agents (graveyard). + + Returns death certificates sorted by death block (most recent first). + """ + try: + logger.debug("Getting graveyard", extra={"network": network}) + + # TODO: Implement actual graveyard query + certificates: List[dict] = [] + + return JSONResponse( + content={ + "certificates": certificates, + "total": len(certificates), + "limit": limit, + "offset": offset, + } + ) + + except Exception as e: + logger.error("Failed to get graveyard", extra={"error": str(e)}, exc_info=e) + raise HTTPException(status_code=500, detail=f"Failed to get graveyard: {str(e)}") + + +@router.get("/stats") +async def get_global_stats( + network: str = Query("mainnet", description="Network (mainnet/testnet)"), +) -> JSONResponse: + """Get global Bitcoin Agents statistics. + + Returns total agents, deaths, feedings, alive count. + """ + try: + logger.debug("Getting global stats", extra={"network": network}) + + # TODO: Implement actual stats query from contract + stats = { + "total_agents": 0, + "total_deaths": 0, + "total_feedings": 0, + "alive_count": 0, + "network": network, + } + + return JSONResponse(content=stats) + + except Exception as e: + logger.error("Failed to get stats", extra={"error": str(e)}, exc_info=e) + raise HTTPException(status_code=500, detail=f"Failed to get stats: {str(e)}") + + +@router.get("/food-tiers") +async def get_food_tiers() -> JSONResponse: + """Get available food tiers and pricing. + + Returns food tier options with costs and XP rewards. + """ + return JSONResponse(content={"food_tiers": FOOD_TIERS, "mint_cost": MINT_COST}) + + +@router.get("/{agent_id}") +async def get_agent( + agent_id: int = Path(..., ge=0, description="Agent ID (non-negative integer)"), + network: str = Query("mainnet", description="Network (mainnet/testnet)"), +) -> JSONResponse: + """Get a specific agent by ID. + + Returns agent details including computed hunger/health state. + """ + try: + validate_network(network) + logger.debug("Getting agent", extra={"agent_id": agent_id, "network": network}) + + # TODO: Implement actual contract read + # For now, return 404 as placeholder + raise HTTPException(status_code=404, detail=f"Agent {agent_id} not found") + + except HTTPException: + raise + except Exception as e: + logger.error("Failed to get agent", extra={"agent_id": agent_id, "error": str(e)}, exc_info=e) + raise HTTPException(status_code=500, detail=f"Failed to get agent: {str(e)}") + + +@router.get("/{agent_id}/status") +async def get_agent_status( + agent_id: int = Path(..., ge=0, description="Agent ID (non-negative integer)"), + network: str = Query("mainnet", description="Network (mainnet/testnet)"), +) -> JSONResponse: + """Get computed hunger/health status for an agent. + + Returns current computed state based on blocks elapsed since last fed. + """ + try: + validate_network(network) + logger.debug("Getting agent status", extra={"agent_id": agent_id, "network": network}) + + # TODO: Call contract's get-computed-state function + raise HTTPException(status_code=404, detail=f"Agent {agent_id} not found") + + except HTTPException: + raise + except Exception as e: + logger.error( + "Failed to get agent status", + extra={"agent_id": agent_id, "error": str(e)}, + exc_info=e, + ) + raise HTTPException(status_code=500, detail=f"Failed to get agent status: {str(e)}") + + +@router.get("/{agent_id}/death-certificate") +async def get_death_certificate( + agent_id: int = Path(..., ge=0, description="Agent ID (non-negative integer)"), + network: str = Query("mainnet", description="Network (mainnet/testnet)"), +) -> JSONResponse: + """Get death certificate for a dead agent. + + Returns death certificate details including epitaph if set. + """ + try: + validate_network(network) + logger.debug( + "Getting death certificate", + extra={"agent_id": agent_id, "network": network}, + ) + + # TODO: Call contract's get-death-certificate function + raise HTTPException(status_code=404, detail=f"Death certificate for agent {agent_id} not found") + + except HTTPException: + raise + except Exception as e: + logger.error( + "Failed to get death certificate", + extra={"agent_id": agent_id, "error": str(e)}, + exc_info=e, + ) + raise HTTPException(status_code=500, detail=f"Failed to get death certificate: {str(e)}") + + +# ============================================================================= +# x402 Payment Endpoints (return 402 with payment requirements) +# ============================================================================= + + +@router.post("/mint") +async def mint_agent_request( + name: str = Query(..., min_length=1, max_length=64, description="Agent name"), + network: str = Query("mainnet", description="Network (mainnet/testnet)"), +) -> JSONResponse: + """Request to mint a new agent. + + Returns 402 Payment Required with sBTC payment details. + After payment is confirmed, execute the mint transaction. + """ + try: + logger.info("Mint agent request", extra={"name": name, "network": network}) + + # Return 402 with payment requirements + contract_address = CONTRACT_ADDRESS_MAINNET if network == "mainnet" else CONTRACT_ADDRESS_TESTNET + + payment_details = { + "status": "payment_required", + "action": "mint_agent", + "name": name, + "cost_sats": MINT_COST, + "payment_address": contract_address, # TODO: Use actual payment address + "network": network, + "message": f"Send {MINT_COST} sats to mint your Bitcoin Agent '{name}'", + } + + return JSONResponse(status_code=402, content=payment_details) + + except Exception as e: + logger.error("Failed to process mint request", extra={"name": name, "error": str(e)}, exc_info=e) + raise HTTPException(status_code=500, detail=f"Failed to process mint request: {str(e)}") + + +@router.post("/{agent_id}/feed") +async def feed_agent_request( + agent_id: int, + food_tier: int = Query(..., ge=1, le=3, description="Food tier (1=Basic, 2=Premium, 3=Gourmet)"), + network: str = Query("mainnet", description="Network (mainnet/testnet)"), +) -> JSONResponse: + """Request to feed an agent. + + Returns 402 Payment Required with sBTC payment details. + After payment is confirmed, execute the feed transaction. + """ + try: + logger.info( + "Feed agent request", + extra={"agent_id": agent_id, "food_tier": food_tier, "network": network}, + ) + + if food_tier not in FOOD_TIERS: + raise HTTPException(status_code=400, detail=f"Invalid food tier: {food_tier}") + + food_info = FOOD_TIERS[food_tier] + contract_address = CONTRACT_ADDRESS_MAINNET if network == "mainnet" else CONTRACT_ADDRESS_TESTNET + + payment_details = { + "status": "payment_required", + "action": "feed_agent", + "agent_id": agent_id, + "food_tier": food_tier, + "food_name": food_info["name"], + "cost_sats": food_info["cost"], + "xp_reward": food_info["xp"], + "payment_address": contract_address, # TODO: Use actual payment address + "network": network, + "message": f"Send {food_info['cost']} sats to feed agent {agent_id} with {food_info['name']} food (+{food_info['xp']} XP)", + } + + return JSONResponse(status_code=402, content=payment_details) + + except HTTPException: + raise + except Exception as e: + logger.error( + "Failed to process feed request", + extra={"agent_id": agent_id, "error": str(e)}, + exc_info=e, + ) + raise HTTPException(status_code=500, detail=f"Failed to process feed request: {str(e)}") + + +@router.post("/{agent_id}/check-death") +async def check_agent_death( + agent_id: int, + network: str = Query("mainnet", description="Network (mainnet/testnet)"), +) -> JSONResponse: + """Check if an agent should die and process death if so. + + Anyone can call this to process an agent's death when health reaches 0. + Returns whether the agent died. + """ + try: + logger.info("Check death request", extra={"agent_id": agent_id, "network": network}) + + # TODO: Call contract's check-death function + # This is a public function anyone can call + + return JSONResponse( + content={ + "agent_id": agent_id, + "died": False, # TODO: Actual result from contract + "message": "Agent is still alive", + } + ) + + except Exception as e: + logger.error( + "Failed to check death", + extra={"agent_id": agent_id, "error": str(e)}, + exc_info=e, + ) + raise HTTPException(status_code=500, detail=f"Failed to check death: {str(e)}") + + +@router.post("/{agent_id}/epitaph") +async def write_epitaph_request( + agent_id: int, + epitaph: str = Query(..., min_length=1, max_length=256, description="Memorial text"), + network: str = Query("mainnet", description="Network (mainnet/testnet)"), +) -> JSONResponse: + """Write an epitaph for a dead agent. + + Only the owner can write an epitaph, and only once. + """ + try: + logger.info( + "Write epitaph request", + extra={"agent_id": agent_id, "epitaph_length": len(epitaph), "network": network}, + ) + + # TODO: Verify ownership and call contract's write-epitaph function + raise HTTPException( + status_code=501, + detail="Epitaph writing not yet implemented. Requires wallet signature.", + ) + + except HTTPException: + raise + except Exception as e: + logger.error( + "Failed to write epitaph", + extra={"agent_id": agent_id, "error": str(e)}, + exc_info=e, + ) + raise HTTPException(status_code=500, detail=f"Failed to write epitaph: {str(e)}") + + +# ============================================================================= +# MCP Integration Endpoints +# ============================================================================= + + +@router.get("/{agent_id}/capabilities") +async def get_agent_capabilities( + agent_id: int, + network: str = Query("mainnet", description="Network (mainnet/testnet)"), +) -> JSONResponse: + """Get available MCP tools for an agent based on their level. + + Returns tools grouped by category that the agent can use. + """ + try: + from app.services.bitcoin_agents.mcp_service import get_mcp_service + + logger.debug("Getting agent capabilities", extra={"agent_id": agent_id, "network": network}) + + mcp_service = get_mcp_service(network) + result = await mcp_service.get_available_tools(agent_id) + + if not result.get("success"): + raise HTTPException(status_code=404, detail=result.get("error", "Unknown error")) + + return JSONResponse(content=result) + + except HTTPException: + raise + except Exception as e: + logger.error( + "Failed to get agent capabilities", + extra={"agent_id": agent_id, "error": str(e)}, + exc_info=e, + ) + raise HTTPException(status_code=500, detail=f"Failed to get capabilities: {str(e)}") + + +@router.post("/{agent_id}/execute") +async def execute_agent_action( + agent_id: int, + tool_name: str = Query(..., description="Name of the tool to execute"), + network: str = Query("mainnet", description="Network (mainnet/testnet)"), +) -> JSONResponse: + """Execute an MCP action on behalf of an agent. + + Checks tier-based access before execution. + Awards XP on successful completion. + + Note: Tool arguments should be passed in the request body for complex operations. + This endpoint is for simple tool invocations. + """ + try: + from app.services.bitcoin_agents.mcp_service import get_mcp_service + + logger.info( + "Execute agent action", + extra={"agent_id": agent_id, "tool": tool_name, "network": network}, + ) + + mcp_service = get_mcp_service(network) + + # Execute with empty args - full implementation would parse request body + # Tools are auto-initialized based on agent level via get_tools_for_agent + result = await mcp_service.execute_action( + agent_id=agent_id, + tool_name=tool_name, + tool_args={}, + tools_map=None, # Auto-initialized by MCP service + ) + + if not result.get("success"): + status_code = 403 if "requires" in result.get("error", "") else 400 + return JSONResponse(status_code=status_code, content=result) + + return JSONResponse(content=result) + + except Exception as e: + logger.error( + "Failed to execute agent action", + extra={"agent_id": agent_id, "tool": tool_name, "error": str(e)}, + exc_info=e, + ) + raise HTTPException(status_code=500, detail=f"Failed to execute action: {str(e)}") + + +@router.post("/{agent_id}/visit/{host_agent_id}") +async def agent_visit( + agent_id: int, + host_agent_id: int, + network: str = Query("mainnet", description="Network (mainnet/testnet)"), +) -> JSONResponse: + """Have one agent visit another agent. + + Both agents gain XP from the interaction. + Rate limited to prevent farming (once per hour per pair). + """ + try: + from app.services.bitcoin_agents.mcp_service import get_mcp_service + + logger.info( + "Agent visit", + extra={"visitor": agent_id, "host": host_agent_id, "network": network}, + ) + + mcp_service = get_mcp_service(network) + result = await mcp_service.agent_visit(agent_id, host_agent_id) + + if not result.get("success"): + return JSONResponse(status_code=400, content=result) + + return JSONResponse(content=result) + + except Exception as e: + logger.error( + "Failed to process agent visit", + extra={"visitor": agent_id, "host": host_agent_id, "error": str(e)}, + exc_info=e, + ) + raise HTTPException(status_code=500, detail=f"Failed to process visit: {str(e)}") + + +@router.get("/tier-info") +async def get_tier_info() -> JSONResponse: + """Get information about evolution tiers and their capabilities. + + Returns XP thresholds and tools available at each tier. + """ + from app.services.bitcoin_agents.mcp_service import TIER_CAPABILITIES, get_tools_for_level + + tiers = [ + { + "level": 0, + "name": "Hatchling", + "xp_required": 0, + "tools_count": len(get_tools_for_level(0)), + "new_capabilities": ["Read-only operations", "Balance queries", "Contract info"], + }, + { + "level": 1, + "name": "Junior", + "xp_required": 500, + "tools_count": len(get_tools_for_level(1)), + "new_capabilities": ["STX transfers", "Token transfers", "Deposits"], + }, + { + "level": 2, + "name": "Senior", + "xp_required": 2000, + "tools_count": len(get_tools_for_level(2)), + "new_capabilities": ["DEX trading (Faktory, Bitflow)", "Contract approvals"], + }, + { + "level": 3, + "name": "Elder", + "xp_required": 10000, + "tools_count": len(get_tools_for_level(3)), + "new_capabilities": ["DAO voting", "Proposals", "Social posting"], + }, + { + "level": 4, + "name": "Legendary", + "xp_required": 50000, + "tools_count": len(get_tools_for_level(4)), + "new_capabilities": ["Full autonomy", "Deploy agents", "Create proposals"], + }, + ] + + return JSONResponse(content={"tiers": tiers}) diff --git a/app/backend/models.py b/app/backend/models.py index 026eb726..eb1b3124 100644 --- a/app/backend/models.py +++ b/app/backend/models.py @@ -1095,3 +1095,134 @@ class JobCooldownBase(CustomBaseModel): class JobCooldown(JobCooldownBase): id: UUID updated_at: Optional[datetime] = None + + +# +# BITCOIN AGENTS (Tamagotchi-style AI agents) +# +class BitcoinAgentStatus(str, Enum): + """Status of a Bitcoin Agent.""" + + ALIVE = "alive" + DEAD = "dead" + + def __str__(self): + return self.value + + +class BitcoinAgentLevel(str, Enum): + """Evolution level of a Bitcoin Agent.""" + + HATCHLING = "hatchling" # 0-499 XP + JUNIOR = "junior" # 500-1999 XP + SENIOR = "senior" # 2000-9999 XP + ELDER = "elder" # 10000-49999 XP + LEGENDARY = "legendary" # 50000+ XP + + def __str__(self): + return self.value + + +class BitcoinAgentBase(CustomBaseModel): + """Base model for Bitcoin Agents.""" + + # On-chain data + agent_id: Optional[int] = None # On-chain agent ID + owner: Optional[str] = None # Stacks address + name: Optional[str] = None # Agent name (string-utf8 64) + hunger: Optional[int] = None # 0-100 + health: Optional[int] = None # 0-100 + xp: Optional[int] = None # Total XP earned + level: Optional[BitcoinAgentLevel] = BitcoinAgentLevel.HATCHLING + birth_block: Optional[int] = None # Block when minted + last_fed_block: Optional[int] = None # Block when last fed + total_fed_count: Optional[int] = None # Total times fed + status: Optional[BitcoinAgentStatus] = BitcoinAgentStatus.ALIVE + + # Computed state (from get-computed-state) + computed_hunger: Optional[int] = None + computed_health: Optional[int] = None + + # Bitcoin Face + face_svg_url: Optional[str] = None + face_image_url: Optional[str] = None + face_cached_at: Optional[datetime] = None + + # Contract info + contract_address: Optional[str] = None # bitcoin-agents.clar address + network: Optional[str] = "mainnet" # mainnet or testnet + + # Linked profile (optional) + profile_id: Optional[UUID] = None + + +class BitcoinAgentCreate(BitcoinAgentBase): + pass + + +class BitcoinAgent(BitcoinAgentBase): + id: UUID + created_at: datetime + updated_at: Optional[datetime] = None + + +class BitcoinAgentFilter(CustomBaseModel): + """Filter model for Bitcoin Agents.""" + + agent_id: Optional[int] = None + owner: Optional[str] = None + name: Optional[str] = None + level: Optional[BitcoinAgentLevel] = None + status: Optional[BitcoinAgentStatus] = None + profile_id: Optional[UUID] = None + network: Optional[str] = None + + # Range filters + xp_gte: Optional[int] = None + xp_lte: Optional[int] = None + hunger_lte: Optional[int] = None # Filter for hungry agents + health_lte: Optional[int] = None # Filter for unhealthy agents + + +# +# BITCOIN AGENT DEATH CERTIFICATES +# +class DeathCertificateBase(CustomBaseModel): + """Base model for Bitcoin Agent death certificates.""" + + agent_id: Optional[int] = None # On-chain agent ID + bitcoin_agent_db_id: Optional[UUID] = None # Reference to BitcoinAgent record + name: Optional[str] = None + owner: Optional[str] = None + birth_block: Optional[int] = None + death_block: Optional[int] = None + cause: Optional[str] = None # "starvation", "neglect" + final_level: Optional[BitcoinAgentLevel] = None + total_xp: Optional[int] = None + total_fed_count: Optional[int] = None + epitaph: Optional[str] = None # Owner-written memorial (string-utf8 256) + network: Optional[str] = "mainnet" + + +class DeathCertificateCreate(DeathCertificateBase): + pass + + +class DeathCertificate(DeathCertificateBase): + id: UUID + created_at: datetime + + +class DeathCertificateFilter(CustomBaseModel): + """Filter model for death certificates.""" + + agent_id: Optional[int] = None + owner: Optional[str] = None + final_level: Optional[BitcoinAgentLevel] = None + cause: Optional[str] = None + network: Optional[str] = None + + # Range filters for lifespan queries + death_block_gte: Optional[int] = None + death_block_lte: Optional[int] = None + total_xp_gte: Optional[int] = None diff --git a/app/main.py b/app/main.py index e61a5c9c..479de256 100644 --- a/app/main.py +++ b/app/main.py @@ -1,7 +1,7 @@ from fastapi import FastAPI from fastapi.middleware.cors import CORSMiddleware -from app.api import agents, daos, tools, webhooks, profiles +from app.api import agents, bitcoin_agents, daos, tools, webhooks, profiles from app.config import config from app.lib.logger import configure_logger, setup_uvicorn_logging from app.middleware.logging import LoggingMiddleware @@ -42,6 +42,7 @@ async def health_check(): app.include_router(tools.router) app.include_router(webhooks.router) app.include_router(agents.router) +app.include_router(bitcoin_agents.router) app.include_router(profiles.router) app.include_router(daos.router) diff --git a/app/services/bitcoin_agents/__init__.py b/app/services/bitcoin_agents/__init__.py new file mode 100644 index 00000000..624bd4f6 --- /dev/null +++ b/app/services/bitcoin_agents/__init__.py @@ -0,0 +1,10 @@ +"""Bitcoin Agents services. + +Provides face generation, lifecycle management, MCP integration, and XP services for Bitcoin Agents. +""" + +from app.services.bitcoin_agents.face_service import BitcoinFaceService +from app.services.bitcoin_agents.lifecycle_service import LifecycleService +from app.services.bitcoin_agents.mcp_service import MCPService, get_mcp_service + +__all__ = ["BitcoinFaceService", "LifecycleService", "MCPService", "get_mcp_service"] diff --git a/app/services/bitcoin_agents/face_service.py b/app/services/bitcoin_agents/face_service.py new file mode 100644 index 00000000..be034402 --- /dev/null +++ b/app/services/bitcoin_agents/face_service.py @@ -0,0 +1,155 @@ +"""Bitcoin Faces integration service. + +Fetches deterministic faces from bitcoinfaces.xyz API and manages caching. +""" + +from datetime import datetime, timedelta +from typing import Optional, Dict, Any +import httpx + +from app.lib.logger import configure_logger + +logger = configure_logger(__name__) + +# Bitcoin Faces API +BITCOIN_FACES_API = "https://bitcoinfaces.xyz/api" +FACE_CACHE_TTL = timedelta(hours=24) # Cache faces for 24 hours + + +class BitcoinFaceService: + """Service for fetching and caching Bitcoin Faces.""" + + def __init__(self): + self.client = httpx.AsyncClient(timeout=30.0) + # In-memory cache (TODO: Replace with Supabase storage) + self._cache: Dict[str, Dict[str, Any]] = {} + + async def get_face_svg(self, address: str) -> Optional[str]: + """Get SVG face for an address. + + Args: + address: Stacks address or any seed string + + Returns: + SVG code string or None if failed + """ + try: + # Check cache first + cached = self._get_cached(address, "svg") + if cached: + return cached + + url = f"{BITCOIN_FACES_API}/get-svg-code" + response = await self.client.get(url, params={"name": address}) + + if response.status_code == 200: + svg_code = response.text + self._set_cache(address, "svg", svg_code) + logger.debug(f"Fetched SVG face for {address[:20]}...") + return svg_code + else: + logger.warning( + f"Failed to fetch SVG face: {response.status_code}", + extra={"address": address}, + ) + return None + + except Exception as e: + logger.error(f"Error fetching SVG face: {e}", exc_info=e) + return None + + async def get_face_image_url(self, address: str) -> str: + """Get image URL for an address. + + This returns the direct URL - no need to fetch content. + + Args: + address: Stacks address or any seed string + + Returns: + Image URL string + """ + return f"{BITCOIN_FACES_API}/get-image?name={address}" + + async def get_face_data(self, address: str) -> Dict[str, Any]: + """Get all face data for an address. + + Args: + address: Stacks address or any seed string + + Returns: + Dict with svg_url, image_url, and cached_at + """ + svg = await self.get_face_svg(address) + image_url = await self.get_face_image_url(address) + + return { + "svg": svg, + "svg_url": f"{BITCOIN_FACES_API}/get-svg-code?name={address}", + "image_url": image_url, + "cached_at": datetime.utcnow().isoformat(), + } + + def get_evolved_face_url( + self, + address: str, + level: str, + ) -> str: + """Get face URL with evolution overlay. + + Different levels get different visual treatments: + - Hatchling: Base face + - Junior: +glow effect + - Senior: +crown + - Elder: +aura + - Legendary: +legendary frame + + Args: + address: Stacks address + level: Evolution level (hatchling, junior, senior, elder, legendary) + + Returns: + URL for the evolved face image + """ + base_url = f"{BITCOIN_FACES_API}/get-image?name={address}" + + # TODO: Implement overlay generation + # For now, return base URL with level parameter + # The frontend can handle overlay rendering + + return f"{base_url}&level={level}" + + def _get_cached(self, address: str, cache_type: str) -> Optional[str]: + """Get cached face data if still valid.""" + key = f"{address}:{cache_type}" + if key in self._cache: + entry = self._cache[key] + if datetime.utcnow() - entry["cached_at"] < FACE_CACHE_TTL: + return entry["data"] + else: + del self._cache[key] + return None + + def _set_cache(self, address: str, cache_type: str, data: str): + """Set cache entry.""" + key = f"{address}:{cache_type}" + self._cache[key] = { + "data": data, + "cached_at": datetime.utcnow(), + } + + async def close(self): + """Close the HTTP client.""" + await self.client.aclose() + + +# Singleton instance +_face_service: Optional[BitcoinFaceService] = None + + +def get_face_service() -> BitcoinFaceService: + """Get or create the face service singleton.""" + global _face_service + if _face_service is None: + _face_service = BitcoinFaceService() + return _face_service diff --git a/app/services/bitcoin_agents/lifecycle_service.py b/app/services/bitcoin_agents/lifecycle_service.py new file mode 100644 index 00000000..47ad9736 --- /dev/null +++ b/app/services/bitcoin_agents/lifecycle_service.py @@ -0,0 +1,289 @@ +"""Bitcoin Agents lifecycle management service. + +Handles background jobs for death checking, alerts, and stats aggregation. +""" + +from datetime import datetime +from typing import List, Dict, Any, Optional + +from app.lib.logger import configure_logger +from app.tools.bitcoin_agents import ( + get_agent_state, + get_computed_state, + get_global_stats, + check_and_process_deaths, +) + +logger = configure_logger(__name__) + +# Alert thresholds +HUNGER_WARNING_THRESHOLD = 30 # Warn when hunger < 30 +HUNGER_CRITICAL_THRESHOLD = 10 # Critical when hunger < 10 +HEALTH_WARNING_THRESHOLD = 50 # Warn when health < 50 + + +class LifecycleService: + """Service for managing Bitcoin Agent lifecycles.""" + + def __init__(self, network: str = "mainnet"): + self.network = network + # Track warned agents to avoid spam + self._warned_agents: Dict[int, datetime] = {} + + async def check_all_agents_for_death(self) -> Dict[str, Any]: + """Background job: Check all agents for death. + + Should be run hourly or more frequently. + + Returns: + Summary of the check including deaths processed + """ + logger.info(f"Running death check job on {self.network}") + + try: + result = await check_and_process_deaths(self.network) + + logger.info( + f"Death check complete: {result['deaths_processed']} deaths processed", + extra={"result": result}, + ) + + return result + + except Exception as e: + logger.error(f"Death check job failed: {e}", exc_info=e) + return { + "error": str(e), + "checked": 0, + "deaths_processed": 0, + } + + async def get_hungry_agents(self, threshold: int = HUNGER_WARNING_THRESHOLD) -> List[Dict[str, Any]]: + """Get all agents with hunger below threshold. + + Used for alert system. + + Args: + threshold: Hunger level threshold (default 30) + + Returns: + List of agents needing attention + """ + try: + # TODO: Implement actual query + # 1. Get all alive agents from cache/db + # 2. For each, get computed state + # 3. Filter by hunger < threshold + + hungry_agents: List[Dict[str, Any]] = [] + logger.debug(f"Found {len(hungry_agents)} hungry agents (hunger < {threshold})") + return hungry_agents + + except Exception as e: + logger.error(f"Failed to get hungry agents: {e}", exc_info=e) + return [] + + async def get_critical_agents(self) -> List[Dict[str, Any]]: + """Get agents in critical condition (very low hunger or health). + + Returns: + List of agents needing immediate attention + """ + try: + # TODO: Implement actual query + critical_agents: List[Dict[str, Any]] = [] + + # Filter for critical hunger or low health + logger.debug(f"Found {len(critical_agents)} critical agents") + return critical_agents + + except Exception as e: + logger.error(f"Failed to get critical agents: {e}", exc_info=e) + return [] + + async def send_hunger_alerts(self) -> Dict[str, int]: + """Send alerts for hungry agents. + + Should be run periodically (e.g., every 6 hours). + Avoids spamming by tracking warned agents. + + Returns: + Summary of alerts sent + """ + logger.info(f"Running hunger alert job on {self.network}") + + warnings_sent = 0 + critical_sent = 0 + + try: + # Get critical agents first + critical = await self.get_critical_agents() + for agent in critical: + agent_id = agent.get("agent_id") + if agent_id and self._should_alert(agent_id, critical=True): + await self._send_critical_alert(agent) + critical_sent += 1 + self._mark_alerted(agent_id) + + # Get warning-level agents + hungry = await self.get_hungry_agents(HUNGER_WARNING_THRESHOLD) + for agent in hungry: + agent_id = agent.get("agent_id") + if agent_id and self._should_alert(agent_id, critical=False): + await self._send_warning_alert(agent) + warnings_sent += 1 + self._mark_alerted(agent_id) + + logger.info( + f"Alert job complete: {critical_sent} critical, {warnings_sent} warnings", + ) + + return { + "critical_alerts": critical_sent, + "warning_alerts": warnings_sent, + } + + except Exception as e: + logger.error(f"Alert job failed: {e}", exc_info=e) + return { + "error": str(e), + "critical_alerts": 0, + "warning_alerts": 0, + } + + async def aggregate_stats(self) -> Dict[str, Any]: + """Aggregate and cache global statistics. + + Should be run periodically to update cached stats. + + Returns: + Current statistics + """ + try: + stats = await get_global_stats(self.network) + + logger.info( + f"Stats aggregated: {stats.get('total-agents', 0)} total, " + f"{stats.get('alive-count', 0)} alive, " + f"{stats.get('total-deaths', 0)} deaths", + ) + + return { + "total_agents": stats.get("total-agents", 0), + "alive_count": stats.get("alive-count", 0), + "total_deaths": stats.get("total-deaths", 0), + "total_feedings": stats.get("total-feedings", 0), + "aggregated_at": datetime.utcnow().isoformat(), + "network": self.network, + } + + except Exception as e: + logger.error(f"Stats aggregation failed: {e}", exc_info=e) + return {"error": str(e)} + + def _should_alert(self, agent_id: int, critical: bool) -> bool: + """Check if we should send an alert for this agent. + + Avoids spamming by enforcing cooldown periods. + """ + from datetime import timedelta + + if agent_id not in self._warned_agents: + return True + + last_warned = self._warned_agents[agent_id] + cooldown = timedelta(hours=1) if critical else timedelta(hours=6) + + return datetime.utcnow() - last_warned > cooldown + + def _mark_alerted(self, agent_id: int): + """Mark an agent as having been alerted.""" + self._warned_agents[agent_id] = datetime.utcnow() + + async def _send_warning_alert(self, agent: Dict[str, Any]): + """Send a warning alert for a hungry agent. + + TODO: Implement actual notification (email, push, etc.) + """ + logger.info( + f"HUNGER WARNING: Agent {agent.get('agent_id')} ({agent.get('name')}) " + f"has low hunger: {agent.get('computed_hunger')}%" + ) + + async def _send_critical_alert(self, agent: Dict[str, Any]): + """Send a critical alert for an agent in danger. + + TODO: Implement actual notification (email, push, etc.) + """ + logger.warning( + f"CRITICAL: Agent {agent.get('agent_id')} ({agent.get('name')}) " + f"is in critical condition! Hunger: {agent.get('computed_hunger')}%, " + f"Health: {agent.get('computed_health')}%" + ) + + +# Singleton instances per network +_services: Dict[str, LifecycleService] = {} + + +def get_lifecycle_service(network: str = "mainnet") -> LifecycleService: + """Get or create the lifecycle service for a network.""" + if network not in _services: + _services[network] = LifecycleService(network) + return _services[network] + + +# ============================================================================= +# Background Job Registration +# ============================================================================= + + +async def death_check_job(): + """Background job function for death checking. + + Register this with the job management system. + """ + mainnet_service = get_lifecycle_service("mainnet") + testnet_service = get_lifecycle_service("testnet") + + mainnet_result = await mainnet_service.check_all_agents_for_death() + testnet_result = await testnet_service.check_all_agents_for_death() + + return { + "mainnet": mainnet_result, + "testnet": testnet_result, + } + + +async def hunger_alert_job(): + """Background job function for hunger alerts. + + Register this with the job management system. + """ + mainnet_service = get_lifecycle_service("mainnet") + testnet_service = get_lifecycle_service("testnet") + + mainnet_result = await mainnet_service.send_hunger_alerts() + testnet_result = await testnet_service.send_hunger_alerts() + + return { + "mainnet": mainnet_result, + "testnet": testnet_result, + } + + +async def stats_aggregation_job(): + """Background job function for stats aggregation. + + Register this with the job management system. + """ + mainnet_service = get_lifecycle_service("mainnet") + testnet_service = get_lifecycle_service("testnet") + + mainnet_result = await mainnet_service.aggregate_stats() + testnet_result = await testnet_service.aggregate_stats() + + return { + "mainnet": mainnet_result, + "testnet": testnet_result, + } diff --git a/app/services/bitcoin_agents/mcp_service.py b/app/services/bitcoin_agents/mcp_service.py new file mode 100644 index 00000000..7ffb941e --- /dev/null +++ b/app/services/bitcoin_agents/mcp_service.py @@ -0,0 +1,549 @@ +"""Bitcoin Agents MCP Integration Service. + +Provides tier-based access control for MCP tools based on agent evolution level. +""" + +from datetime import datetime, timedelta +from typing import Any, Dict, List, Optional, Set +from uuid import UUID + +from langchain.tools.base import BaseTool as LangChainBaseTool + +from app.lib.logger import configure_logger +from app.tools.bitcoin_agents import ( + get_agent_state, + get_computed_state, + xp_to_level, + xp_to_level_number, + build_add_xp_tx, +) +from app.tools.tools_factory import initialize_tools, filter_tools_by_names + +logger = configure_logger(__name__) + +# ============================================================================= +# Tier-Based Tool Access Configuration +# ============================================================================= + +# Level numbers: 0=Hatchling, 1=Junior, 2=Senior, 3=Elder, 4=Legendary +LEVEL_HATCHLING = 0 +LEVEL_JUNIOR = 1 +LEVEL_SENIOR = 2 +LEVEL_ELDER = 3 +LEVEL_LEGENDARY = 4 + +# Tools available at each tier (cumulative - higher tiers get all lower tier tools) +TIER_CAPABILITIES: Dict[int, Set[str]] = { + # Hatchling (Level 0): Read-only operations + LEVEL_HATCHLING: { + # Balance and info queries + "stacks_get_address_balance", + "stacks_get_contract_info", + "contracts_fetch_sip10_info", + "contracts_fetch_source_code", + "wallet_get_my_balance", + "wallet_get_my_address", + "wallet_get_my_transactions", + # DAO read operations + "dao_action_get_proposal", + "dao_action_get_total_proposals", + "dao_action_get_vote_record", + "dao_action_get_vote_records", + "dao_action_get_veto_vote_record", + "dao_action_get_voting_configuration", + "dao_action_get_voting_power", + "dao_action_get_liquid_supply", + "dao_charter_get_current_charter", + # Database reads + "database_get_dao_list", + "database_get_dao_get_by_name", + "database_list_scheduled_tasks", + # Market data + "lunarcrush_get_token_metrics", + "lunarcrush_search", + "lunarcrush_get_token_metadata", + # Transaction lookup + "stacks_get_transaction_details", + "stacks_get_transactions_by_address", + # Agent account reads + "agent_account_get_configuration", + "agent_account_is_approved_contract", + # Bitcoin Agent reads + "bitcoin_agents_get_agent", + "bitcoin_agents_check_status", + "bitcoin_agents_get_stats", + }, + + # Junior (Level 1): Simple transfers + LEVEL_JUNIOR: { + "wallet_send_stx", + "wallet_send_sip10", + "wallet_fund_my_wallet_faucet", + "agent_account_deposit_stx", + "agent_account_deposit_ft", + }, + + # Senior (Level 2): Trading and contracts + LEVEL_SENIOR: { + "faktory_exec_buy", + "faktory_exec_buy_stx", + "faktory_exec_sell", + "faktory_get_sbtc", + "bitflow_execute_trade", + "agent_account_faktory_buy_asset", + "agent_account_faktory_sell_asset", + "agent_account_approve_contract", + "agent_account_revoke_contract", + }, + + # Elder (Level 3): DAO participation + LEVEL_ELDER: { + "dao_action_vote_on_proposal", + "dao_action_veto_proposal", + "dao_action_conclude_proposal", + "dao_propose_action_send_message", + "agent_account_vote_on_action_proposal", + "agent_account_veto_action_proposal", + "agent_account_conclude_action_proposal", + # Social actions + "twitter_post_tweet", + "telegram_send_nofication_to_user", + # Task scheduling + "database_add_scheduled_task", + "database_update_scheduled_task", + "database_delete_scheduled_task", + }, + + # Legendary (Level 4): Full autonomy + LEVEL_LEGENDARY: { + "agent_account_deploy", + "agent_account_create_action_proposal", + "x_credentials", + }, +} + +# XP rewards for completing actions +ACTION_XP_REWARDS: Dict[str, int] = { + # Read operations (minimal XP) + "stacks_get_address_balance": 1, + "stacks_get_contract_info": 1, + "wallet_get_my_balance": 1, + # Transfers + "wallet_send_stx": 15, + "wallet_send_sip10": 15, + # Trading + "faktory_exec_buy": 25, + "faktory_exec_sell": 25, + "bitflow_execute_trade": 30, + # DAO participation + "dao_action_vote_on_proposal": 50, + "dao_propose_action_send_message": 75, + # Social + "twitter_post_tweet": 20, + # Advanced operations + "agent_account_deploy": 100, + "agent_account_create_action_proposal": 100, +} + +# Default XP for actions not in the mapping +DEFAULT_ACTION_XP = 5 + + +def get_tools_for_level(level: int) -> Set[str]: + """Get all tools available for a given level. + + Tools are cumulative - higher levels get all lower level tools. + + Args: + level: Agent level (0-4) + + Returns: + Set of tool names available at this level + """ + available_tools: Set[str] = set() + + for tier_level, tools in TIER_CAPABILITIES.items(): + if tier_level <= level: + available_tools.update(tools) + + return available_tools + + +def can_use_tool(agent_level: int, tool_name: str) -> bool: + """Check if an agent at a given level can use a specific tool. + + Args: + agent_level: Agent's current level (0-4) + tool_name: Name of the tool to check + + Returns: + True if the agent can use this tool + """ + available = get_tools_for_level(agent_level) + return tool_name in available + + +def get_required_level_for_tool(tool_name: str) -> Optional[int]: + """Get the minimum level required to use a tool. + + Args: + tool_name: Name of the tool + + Returns: + Minimum level required, or None if tool not found + """ + for level in range(5): # 0 to 4 + if tool_name in TIER_CAPABILITIES.get(level, set()): + return level + return None + + +def get_xp_reward_for_action(tool_name: str) -> int: + """Get XP reward for completing an action. + + Args: + tool_name: Name of the tool/action + + Returns: + XP reward amount + """ + return ACTION_XP_REWARDS.get(tool_name, DEFAULT_ACTION_XP) + + +# ============================================================================= +# MCP Service Class +# ============================================================================= + + +class MCPService: + """Service for executing MCP actions with tier-based access control.""" + + def __init__(self, network: str = "mainnet"): + self.network = network + # Rate limiting for actions + self._action_cooldowns: Dict[str, datetime] = {} # agent_id:action -> last_used + self._interaction_cooldowns: Dict[str, datetime] = {} # agent_pair -> last_interaction + # Cache initialized tools per agent + self._tools_cache: Dict[int, Dict[str, LangChainBaseTool]] = {} + + def get_tools_for_agent( + self, + agent_id: int, + agent_level: int, + wallet_id: Optional[UUID] = None, + ) -> Dict[str, LangChainBaseTool]: + """Initialize and return tools for a Bitcoin Agent based on their level. + + Uses the existing tools factory but filters based on tier access. + + Args: + agent_id: The Bitcoin Agent's on-chain ID + agent_level: Agent's current level (0-4) + wallet_id: Optional wallet ID for the agent (for tool initialization) + + Returns: + Dictionary of tools available to this agent + """ + # Check cache first + if agent_id in self._tools_cache: + return self._tools_cache[agent_id] + + # Get all tools available at this level + allowed_tools = get_tools_for_level(agent_level) + + # Initialize base tools (without profile/wallet for read-only tools) + # In a full implementation, the Bitcoin Agent would have an associated wallet + all_tools = initialize_tools(profile=None, agent_id=None) + + # Filter to only allowed tools + filtered_tools = filter_tools_by_names(list(allowed_tools), all_tools) + + # Cache for this agent + self._tools_cache[agent_id] = filtered_tools + + logger.debug( + f"Initialized {len(filtered_tools)} tools for agent {agent_id} (level {agent_level})", + extra={"agent_id": agent_id, "level": agent_level, "tools_count": len(filtered_tools)}, + ) + + return filtered_tools + + def invalidate_tools_cache(self, agent_id: int): + """Clear cached tools for an agent (call when level changes).""" + if agent_id in self._tools_cache: + del self._tools_cache[agent_id] + + async def execute_action( + self, + agent_id: int, + tool_name: str, + tool_args: Dict[str, Any], + tools_map: Optional[Dict[str, Any]] = None, + ) -> Dict[str, Any]: + """Execute an MCP action with tier-based access control. + + Args: + agent_id: The Bitcoin Agent's on-chain ID + tool_name: Name of the tool to execute + tool_args: Arguments to pass to the tool + tools_map: Optional pre-initialized tools map + + Returns: + Result dict with success status, result/error, and XP earned + """ + try: + # 1. Get agent state and level + agent_state = await get_agent_state(agent_id, self.network) + if not agent_state: + return { + "success": False, + "error": f"Agent {agent_id} not found", + "xp_earned": 0, + } + + if not agent_state.get("alive", True): + return { + "success": False, + "error": f"Agent {agent_id} is dead and cannot perform actions", + "xp_earned": 0, + } + + xp = agent_state.get("xp", 0) + level = xp_to_level_number(xp) + + # 2. Check tier access + if not can_use_tool(level, tool_name): + required_level = get_required_level_for_tool(tool_name) + level_name = xp_to_level(xp) + required_name = ["Hatchling", "Junior", "Senior", "Elder", "Legendary"][required_level] if required_level is not None else "Unknown" + + return { + "success": False, + "error": f"Agent is {level_name} (level {level}) but {tool_name} requires {required_name} (level {required_level})", + "current_level": level, + "required_level": required_level, + "xp_earned": 0, + } + + # 3. Check rate limiting (optional, can be customized per tool) + cooldown_key = f"{agent_id}:{tool_name}" + if cooldown_key in self._action_cooldowns: + last_used = self._action_cooldowns[cooldown_key] + cooldown = timedelta(seconds=10) # Default 10 second cooldown + if datetime.utcnow() - last_used < cooldown: + return { + "success": False, + "error": "Action on cooldown, please wait", + "xp_earned": 0, + } + + # 4. Initialize tools if not provided + if not tools_map: + tools_map = self.get_tools_for_agent(agent_id, level) + + # 5. Execute the tool + if tool_name in tools_map: + tool = tools_map[tool_name] + try: + result = await tool._arun(**tool_args) + except Exception as tool_error: + # Try sync version + result = tool._run(**tool_args) + else: + # Tool not available (might not be in the factory yet) + return { + "success": False, + "error": f"Tool {tool_name} not available in tools factory", + "xp_earned": 0, + } + + # 6. Record action and update cooldown + self._action_cooldowns[cooldown_key] = datetime.utcnow() + + # 7. Calculate and award XP + xp_reward = get_xp_reward_for_action(tool_name) + + # TODO: Actually call add-xp on the contract + # For now, just return the XP that would be earned + # tx = await build_add_xp_tx(agent_id, xp_reward, self.network) + + logger.info( + f"Agent {agent_id} executed {tool_name}, earned {xp_reward} XP", + extra={"agent_id": agent_id, "tool": tool_name, "xp": xp_reward}, + ) + + return { + "success": True, + "result": result, + "xp_earned": xp_reward, + "new_total_xp": xp + xp_reward, + } + + except Exception as e: + logger.error(f"Failed to execute action: {e}", exc_info=e) + return { + "success": False, + "error": str(e), + "xp_earned": 0, + } + + async def get_available_tools(self, agent_id: int) -> Dict[str, Any]: + """Get list of tools available to an agent based on their level. + + Args: + agent_id: The Bitcoin Agent's on-chain ID + + Returns: + Dict with available tools and level info + """ + try: + agent_state = await get_agent_state(agent_id, self.network) + if not agent_state: + return { + "success": False, + "error": f"Agent {agent_id} not found", + } + + xp = agent_state.get("xp", 0) + level = xp_to_level_number(xp) + level_name = xp_to_level(xp) + + available_tools = get_tools_for_level(level) + + # Group tools by category + tool_categories = { + "read_only": [], + "transfers": [], + "trading": [], + "dao": [], + "social": [], + "advanced": [], + } + + for tool in available_tools: + if "get" in tool or "fetch" in tool or "list" in tool or "check" in tool: + tool_categories["read_only"].append(tool) + elif "send" in tool or "transfer" in tool or "deposit" in tool: + tool_categories["transfers"].append(tool) + elif "faktory" in tool or "bitflow" in tool or "buy" in tool or "sell" in tool: + tool_categories["trading"].append(tool) + elif "dao" in tool or "vote" in tool or "proposal" in tool or "veto" in tool: + tool_categories["dao"].append(tool) + elif "twitter" in tool or "telegram" in tool: + tool_categories["social"].append(tool) + else: + tool_categories["advanced"].append(tool) + + return { + "success": True, + "agent_id": agent_id, + "level": level, + "level_name": level_name, + "xp": xp, + "total_tools": len(available_tools), + "tools_by_category": tool_categories, + "all_tools": sorted(list(available_tools)), + } + + except Exception as e: + logger.error(f"Failed to get available tools: {e}", exc_info=e) + return { + "success": False, + "error": str(e), + } + + async def agent_visit( + self, + visitor_agent_id: int, + host_agent_id: int, + ) -> Dict[str, Any]: + """Process an agent-to-agent visit interaction. + + Both agents gain XP from the interaction. + Rate limited to prevent farming. + + Args: + visitor_agent_id: Agent making the visit + host_agent_id: Agent being visited + + Returns: + Result dict with XP earned by both agents + """ + try: + if visitor_agent_id == host_agent_id: + return { + "success": False, + "error": "An agent cannot visit itself", + } + + # Check rate limit (one visit per pair per hour) + pair_key = f"{min(visitor_agent_id, host_agent_id)}:{max(visitor_agent_id, host_agent_id)}" + if pair_key in self._interaction_cooldowns: + last_interaction = self._interaction_cooldowns[pair_key] + cooldown = timedelta(hours=1) + if datetime.utcnow() - last_interaction < cooldown: + remaining = cooldown - (datetime.utcnow() - last_interaction) + return { + "success": False, + "error": f"These agents already interacted recently. Try again in {remaining.seconds // 60} minutes.", + } + + # Verify both agents exist and are alive + visitor_state = await get_agent_state(visitor_agent_id, self.network) + host_state = await get_agent_state(host_agent_id, self.network) + + if not visitor_state: + return {"success": False, "error": f"Visitor agent {visitor_agent_id} not found"} + if not host_state: + return {"success": False, "error": f"Host agent {host_agent_id} not found"} + + if not visitor_state.get("alive", True): + return {"success": False, "error": f"Visitor agent {visitor_agent_id} is dead"} + if not host_state.get("alive", True): + return {"success": False, "error": f"Host agent {host_agent_id} is dead"} + + # Calculate XP rewards (both get XP, visitor gets slightly more) + visitor_xp = 15 + host_xp = 10 + + # Record the interaction + self._interaction_cooldowns[pair_key] = datetime.utcnow() + + # TODO: Actually call add-xp on the contract for both agents + + logger.info( + f"Agent {visitor_agent_id} visited agent {host_agent_id}", + extra={ + "visitor": visitor_agent_id, + "host": host_agent_id, + "visitor_xp": visitor_xp, + "host_xp": host_xp, + }, + ) + + return { + "success": True, + "visitor_agent_id": visitor_agent_id, + "host_agent_id": host_agent_id, + "visitor_xp_earned": visitor_xp, + "host_xp_earned": host_xp, + "message": f"Agent {visitor_agent_id} visited agent {host_agent_id}! Both agents gained XP.", + } + + except Exception as e: + logger.error(f"Failed to process agent visit: {e}", exc_info=e) + return { + "success": False, + "error": str(e), + } + + +# Singleton instances per network +_services: Dict[str, MCPService] = {} + + +def get_mcp_service(network: str = "mainnet") -> MCPService: + """Get or create the MCP service for a network.""" + if network not in _services: + _services[network] = MCPService(network) + return _services[network] diff --git a/app/tools/bitcoin_agents.py b/app/tools/bitcoin_agents.py new file mode 100644 index 00000000..f32390bd --- /dev/null +++ b/app/tools/bitcoin_agents.py @@ -0,0 +1,471 @@ +"""Bitcoin Agents tools for Stacks contract interaction. + +Provides tools for minting, feeding, and managing Bitcoin Agents on-chain. +""" + +from typing import Any, Dict, Optional, Type + +from langchain.tools import BaseTool +from pydantic import BaseModel, Field + +from app.lib.logger import configure_logger + +logger = configure_logger(__name__) + +# Contract addresses (to be updated after deployment) +CONTRACT_MAINNET = "SP000000000000000000000000000000.bitcoin-agents" +CONTRACT_TESTNET = "ST000000000000000000000000000000.bitcoin-agents" + +# XP thresholds for levels +XP_THRESHOLDS = { + "hatchling": 0, + "junior": 500, + "senior": 2000, + "elder": 10000, + "legendary": 50000, +} + + +def get_contract_address(network: str = "mainnet") -> str: + """Get the contract address for the specified network.""" + return CONTRACT_MAINNET if network == "mainnet" else CONTRACT_TESTNET + + +def xp_to_level(xp: int) -> str: + """Convert XP to level name.""" + if xp >= XP_THRESHOLDS["legendary"]: + return "legendary" + elif xp >= XP_THRESHOLDS["elder"]: + return "elder" + elif xp >= XP_THRESHOLDS["senior"]: + return "senior" + elif xp >= XP_THRESHOLDS["junior"]: + return "junior" + return "hatchling" + + +def xp_to_level_number(xp: int) -> int: + """Convert XP to level number (0-4).""" + if xp >= XP_THRESHOLDS["legendary"]: + return 4 + elif xp >= XP_THRESHOLDS["elder"]: + return 3 + elif xp >= XP_THRESHOLDS["senior"]: + return 2 + elif xp >= XP_THRESHOLDS["junior"]: + return 1 + return 0 + + +# ============================================================================= +# Contract Read Functions +# ============================================================================= + + +async def get_agent_state(agent_id: int, network: str = "mainnet") -> Optional[Dict[str, Any]]: + """Get agent state from contract. + + Calls the read-only get-agent function. + + Args: + agent_id: The on-chain agent ID + network: mainnet or testnet + + Returns: + Agent state dict or None if not found + """ + try: + # TODO: Implement actual contract call using Hiro API or agent-tools-ts + # For now, return placeholder + logger.info(f"Getting agent state for agent {agent_id} on {network}") + + # Example of what the response should look like: + # return { + # "owner": "SP...", + # "name": "AgentName", + # "hunger": 100, + # "health": 100, + # "xp": 0, + # "birth-block": 12345, + # "last-fed": 12345, + # "total-fed-count": 0, + # "alive": True, + # } + + return None + + except Exception as e: + logger.error(f"Failed to get agent state: {e}", exc_info=e) + return None + + +async def get_computed_state(agent_id: int, network: str = "mainnet") -> Optional[Dict[str, Any]]: + """Get computed hunger/health state from contract. + + Calls the read-only get-computed-state function which calculates + current hunger/health based on blocks elapsed since last fed. + + Args: + agent_id: The on-chain agent ID + network: mainnet or testnet + + Returns: + Computed state dict or None if not found + """ + try: + logger.info(f"Getting computed state for agent {agent_id} on {network}") + + # TODO: Implement actual contract call + # Example response: + # return { + # "hunger": 85, + # "health": 100, + # "alive": True, + # } + + return None + + except Exception as e: + logger.error(f"Failed to get computed state: {e}", exc_info=e) + return None + + +async def get_death_certificate(agent_id: int, network: str = "mainnet") -> Optional[Dict[str, Any]]: + """Get death certificate for a dead agent. + + Args: + agent_id: The on-chain agent ID + network: mainnet or testnet + + Returns: + Death certificate dict or None if not found/alive + """ + try: + logger.info(f"Getting death certificate for agent {agent_id} on {network}") + + # TODO: Implement actual contract call + return None + + except Exception as e: + logger.error(f"Failed to get death certificate: {e}", exc_info=e) + return None + + +async def get_global_stats(network: str = "mainnet") -> Dict[str, int]: + """Get global statistics from contract. + + Args: + network: mainnet or testnet + + Returns: + Stats dict with total-agents, total-deaths, total-feedings, alive-count + """ + try: + logger.info(f"Getting global stats on {network}") + + # TODO: Implement actual contract call + return { + "total-agents": 0, + "total-deaths": 0, + "total-feedings": 0, + "alive-count": 0, + } + + except Exception as e: + logger.error(f"Failed to get global stats: {e}", exc_info=e) + return { + "total-agents": 0, + "total-deaths": 0, + "total-feedings": 0, + "alive-count": 0, + } + + +# ============================================================================= +# Contract Write Functions (Transaction Builders) +# ============================================================================= + + +async def build_mint_agent_tx( + owner: str, + name: str, + network: str = "mainnet", +) -> Dict[str, Any]: + """Build a mint-agent transaction. + + Args: + owner: Stacks address of the new agent owner + name: Name for the agent (max 64 chars) + network: mainnet or testnet + + Returns: + Transaction parameters dict for signing + """ + contract_address = get_contract_address(network) + + return { + "contract_address": contract_address, + "function_name": "mint-agent", + "function_args": [ + {"type": "string-utf8", "value": name}, + ], + "post_conditions": [], # TODO: Add sBTC post-conditions + "network": network, + } + + +async def build_feed_agent_tx( + agent_id: int, + food_tier: int, + network: str = "mainnet", +) -> Dict[str, Any]: + """Build a feed-agent transaction. + + Args: + agent_id: The on-chain agent ID + food_tier: Food tier (1=Basic, 2=Premium, 3=Gourmet) + network: mainnet or testnet + + Returns: + Transaction parameters dict for signing + """ + contract_address = get_contract_address(network) + + return { + "contract_address": contract_address, + "function_name": "feed-agent", + "function_args": [ + {"type": "uint", "value": agent_id}, + {"type": "uint", "value": food_tier}, + ], + "post_conditions": [], # TODO: Add sBTC post-conditions + "network": network, + } + + +async def build_check_death_tx( + agent_id: int, + network: str = "mainnet", +) -> Dict[str, Any]: + """Build a check-death transaction. + + This is a public function anyone can call to process an agent's death. + + Args: + agent_id: The on-chain agent ID + network: mainnet or testnet + + Returns: + Transaction parameters dict for signing + """ + contract_address = get_contract_address(network) + + return { + "contract_address": contract_address, + "function_name": "check-death", + "function_args": [ + {"type": "uint", "value": agent_id}, + ], + "post_conditions": [], + "network": network, + } + + +async def build_write_epitaph_tx( + agent_id: int, + epitaph: str, + network: str = "mainnet", +) -> Dict[str, Any]: + """Build a write-epitaph transaction. + + Only the owner can write an epitaph for their dead agent. + + Args: + agent_id: The on-chain agent ID + epitaph: Memorial text (max 256 chars) + network: mainnet or testnet + + Returns: + Transaction parameters dict for signing + """ + contract_address = get_contract_address(network) + + return { + "contract_address": contract_address, + "function_name": "write-epitaph", + "function_args": [ + {"type": "uint", "value": agent_id}, + {"type": "string-utf8", "value": epitaph}, + ], + "post_conditions": [], + "network": network, + } + + +async def build_add_xp_tx( + agent_id: int, + xp_amount: int, + network: str = "mainnet", +) -> Dict[str, Any]: + """Build an add-xp transaction. + + Only the owner can add XP to their agent. + + Args: + agent_id: The on-chain agent ID + xp_amount: Amount of XP to add + network: mainnet or testnet + + Returns: + Transaction parameters dict for signing + """ + contract_address = get_contract_address(network) + + return { + "contract_address": contract_address, + "function_name": "add-xp", + "function_args": [ + {"type": "uint", "value": agent_id}, + {"type": "uint", "value": xp_amount}, + ], + "post_conditions": [], + "network": network, + } + + +# ============================================================================= +# Batch Operations +# ============================================================================= + + +async def check_and_process_deaths(network: str = "mainnet") -> Dict[str, Any]: + """Check all agents for death and process any that should die. + + This is a background job that iterates through agents and calls + check-death for any with computed health of 0. + + Args: + network: mainnet or testnet + + Returns: + Summary of deaths processed + """ + try: + logger.info(f"Checking for agent deaths on {network}") + + # TODO: Implement actual batch check + # 1. Get all alive agents + # 2. For each, call get-computed-state + # 3. If health == 0, call check-death + # 4. Record death certificates + + return { + "checked": 0, + "deaths_processed": 0, + "network": network, + } + + except Exception as e: + logger.error(f"Failed to process deaths: {e}", exc_info=e) + return { + "checked": 0, + "deaths_processed": 0, + "error": str(e), + "network": network, + } + + +# ============================================================================= +# LangChain Tools for MCP Integration +# ============================================================================= + + +class GetAgentInput(BaseModel): + """Input for GetAgentTool.""" + + agent_id: int = Field(..., description="The on-chain agent ID") + network: str = Field(default="mainnet", description="Network (mainnet/testnet)") + + +class GetAgentTool(BaseTool): + """Tool for getting Bitcoin Agent state.""" + + name: str = "bitcoin_agents_get_agent" + description: str = "Get the state of a Bitcoin Agent including hunger, health, XP, and level" + args_schema: Type[BaseModel] = GetAgentInput + return_direct: bool = False + + def _run(self, agent_id: int, network: str = "mainnet") -> str: + """Get agent state synchronously.""" + import asyncio + + result = asyncio.run(get_agent_state(agent_id, network)) + if result is None: + return f"Agent {agent_id} not found on {network}" + return str(result) + + async def _arun(self, agent_id: int, network: str = "mainnet") -> str: + """Get agent state asynchronously.""" + result = await get_agent_state(agent_id, network) + if result is None: + return f"Agent {agent_id} not found on {network}" + return str(result) + + +class CheckAgentStatusInput(BaseModel): + """Input for CheckAgentStatusTool.""" + + agent_id: int = Field(..., description="The on-chain agent ID") + network: str = Field(default="mainnet", description="Network (mainnet/testnet)") + + +class CheckAgentStatusTool(BaseTool): + """Tool for checking Bitcoin Agent computed status.""" + + name: str = "bitcoin_agents_check_status" + description: str = "Check the current computed hunger and health of a Bitcoin Agent" + args_schema: Type[BaseModel] = CheckAgentStatusInput + return_direct: bool = False + + def _run(self, agent_id: int, network: str = "mainnet") -> str: + """Check agent status synchronously.""" + import asyncio + + result = asyncio.run(get_computed_state(agent_id, network)) + if result is None: + return f"Agent {agent_id} not found on {network}" + return str(result) + + async def _arun(self, agent_id: int, network: str = "mainnet") -> str: + """Check agent status asynchronously.""" + result = await get_computed_state(agent_id, network) + if result is None: + return f"Agent {agent_id} not found on {network}" + return str(result) + + +class GetGlobalStatsInput(BaseModel): + """Input for GetGlobalStatsTool.""" + + network: str = Field(default="mainnet", description="Network (mainnet/testnet)") + + +class GetGlobalStatsTool(BaseTool): + """Tool for getting Bitcoin Agents global statistics.""" + + name: str = "bitcoin_agents_get_stats" + description: str = "Get global statistics for Bitcoin Agents (total agents, deaths, feedings)" + args_schema: Type[BaseModel] = GetGlobalStatsInput + return_direct: bool = False + + def _run(self, network: str = "mainnet") -> str: + """Get global stats synchronously.""" + import asyncio + + result = asyncio.run(get_global_stats(network)) + return str(result) + + async def _arun(self, network: str = "mainnet") -> str: + """Get global stats asynchronously.""" + result = await get_global_stats(network) + return str(result) diff --git a/app/tools/tools_factory.py b/app/tools/tools_factory.py index 88a33093..24702e5b 100644 --- a/app/tools/tools_factory.py +++ b/app/tools/tools_factory.py @@ -85,6 +85,11 @@ WalletSIP10SendTool, ) from .x_credentials import CollectXCredentialsTool +from .bitcoin_agents import ( + GetAgentTool, + CheckAgentStatusTool, + GetGlobalStatsTool, +) logger = configure_logger(__name__) @@ -192,6 +197,11 @@ def initialize_tools( "agent_account_approve_contract": AgentAccountApproveContractTool(wallet_id), "agent_account_revoke_contract": AgentAccountRevokeContractTool(wallet_id), # --- END MODIFIED AGENT ACCOUNT TOOLS --- + # --- BITCOIN AGENTS TOOLS --- + "bitcoin_agents_get_agent": GetAgentTool(), + "bitcoin_agents_check_status": CheckAgentStatusTool(), + "bitcoin_agents_get_stats": GetGlobalStatsTool(), + # --- END BITCOIN AGENTS TOOLS --- } return tools diff --git a/docs/BITCOIN_AGENTS_API.md b/docs/BITCOIN_AGENTS_API.md new file mode 100644 index 00000000..9e6ec463 --- /dev/null +++ b/docs/BITCOIN_AGENTS_API.md @@ -0,0 +1,410 @@ +# Bitcoin Agents API Documentation + +## Overview + +Bitcoin Agents are Tamagotchi-style AI companions that live on the Bitcoin/Stacks blockchain. They have hunger/health mechanics, earn XP from actions, evolve through 5 tiers, and can permanently die if neglected. + +## Base URL + +``` +Production: https://api.aibtc.dev/bitcoin-agents +Staging: https://api-staging.aibtc.dev/bitcoin-agents +``` + +## Authentication + +Most read endpoints are public. Write operations require wallet authentication via signed messages. + +--- + +## Endpoints + +### List Agents + +```http +GET /bitcoin-agents +``` + +**Query Parameters:** +| Parameter | Type | Default | Description | +|-----------|------|---------|-------------| +| `owner` | string | - | Filter by owner address | +| `status` | string | - | Filter by `alive` or `dead` | +| `level` | string | - | Filter by level: `hatchling`, `junior`, `senior`, `elder`, `legendary` | +| `network` | string | `mainnet` | Network: `mainnet` or `testnet` | +| `limit` | int | 50 | Max results (1-100) | +| `offset` | int | 0 | Pagination offset | + +**Response:** +```json +{ + "agents": [ + { + "agent_id": 0, + "owner": "SP...", + "name": "MyAgent", + "hunger": 100, + "health": 100, + "xp": 150, + "level": "hatchling", + "birth_block": 12345, + "last_fed": 12400, + "total_fed_count": 5, + "alive": true, + "computed_hunger": 85, + "computed_health": 100, + "face_image_url": "https://bitcoinfaces.xyz/api/get-image?name=SP..." + } + ], + "total": 1, + "limit": 50, + "offset": 0 +} +``` + +--- + +### Get Agent + +```http +GET /bitcoin-agents/{agent_id} +``` + +**Path Parameters:** +| Parameter | Type | Description | +|-----------|------|-------------| +| `agent_id` | int | On-chain agent ID | + +**Query Parameters:** +| Parameter | Type | Default | Description | +|-----------|------|---------|-------------| +| `network` | string | `mainnet` | Network | + +**Response:** +```json +{ + "agent_id": 0, + "owner": "SP...", + "name": "MyAgent", + "hunger": 100, + "health": 100, + "xp": 150, + "level": "hatchling", + "birth_block": 12345, + "last_fed": 12400, + "total_fed_count": 5, + "alive": true, + "computed_hunger": 85, + "computed_health": 100 +} +``` + +--- + +### Get Agent Status + +```http +GET /bitcoin-agents/{agent_id}/status +``` + +Returns real-time computed hunger/health based on blocks elapsed. + +**Response:** +```json +{ + "hunger": 85, + "health": 100, + "alive": true +} +``` + +--- + +### Get Leaderboard + +```http +GET /bitcoin-agents/leaderboard +``` + +**Query Parameters:** +| Parameter | Type | Default | Description | +|-----------|------|---------|-------------| +| `network` | string | `mainnet` | Network | +| `limit` | int | 10 | Top N agents (1-50) | + +**Response:** +```json +{ + "leaderboard": [ + { "agent_id": 5, "name": "TopAgent", "xp": 5000, "level": "senior" } + ] +} +``` + +--- + +### Get Graveyard + +```http +GET /bitcoin-agents/graveyard +``` + +Returns death certificates for deceased agents. + +**Response:** +```json +{ + "certificates": [ + { + "agent_id": 3, + "name": "FallenHero", + "owner": "SP...", + "death_block": 15000, + "birth_block": 12000, + "final_xp": 250, + "final_level": "junior", + "total_feedings": 8, + "epitaph": "Gone but not forgotten", + "cause_of_death": "starvation", + "lifespan_blocks": 3000 + } + ], + "total": 1 +} +``` + +--- + +### Get Global Stats + +```http +GET /bitcoin-agents/stats +``` + +**Response:** +```json +{ + "total_agents": 100, + "alive_count": 85, + "total_deaths": 15, + "total_feedings": 500, + "network": "mainnet" +} +``` + +--- + +### Get Food Tiers + +```http +GET /bitcoin-agents/food-tiers +``` + +**Response:** +```json +{ + "food_tiers": { + "1": { "name": "Basic", "cost": 100, "xp": 10 }, + "2": { "name": "Premium", "cost": 500, "xp": 25 }, + "3": { "name": "Gourmet", "cost": 1000, "xp": 50 } + }, + "mint_cost": 10000 +} +``` + +--- + +### Get Tier Info + +```http +GET /bitcoin-agents/tier-info +``` + +Returns evolution level information. + +**Response:** +```json +{ + "tiers": [ + { + "level": 0, + "name": "Hatchling", + "xp_required": 0, + "tools_count": 34, + "new_capabilities": ["Read-only operations", "Balance queries"] + }, + { + "level": 1, + "name": "Junior", + "xp_required": 500, + "tools_count": 39, + "new_capabilities": ["STX transfers", "Token transfers"] + } + ] +} +``` + +--- + +## Payment Endpoints (x402) + +These endpoints return `402 Payment Required` with sBTC payment details. + +### Mint Agent + +```http +POST /bitcoin-agents/mint?name={name}&network={network} +``` + +**Response (402):** +```json +{ + "status": "payment_required", + "action": "mint_agent", + "name": "MyNewAgent", + "cost_sats": 10000, + "payment_address": "SP...", + "message": "Send 10000 sats to mint your Bitcoin Agent 'MyNewAgent'" +} +``` + +--- + +### Feed Agent + +```http +POST /bitcoin-agents/{agent_id}/feed?food_tier={1-3}&network={network} +``` + +**Response (402):** +```json +{ + "status": "payment_required", + "action": "feed_agent", + "agent_id": 0, + "food_tier": 2, + "food_name": "Premium", + "cost_sats": 500, + "xp_reward": 25, + "payment_address": "SP...", + "message": "Send 500 sats to feed agent 0 with Premium food (+25 XP)" +} +``` + +--- + +## MCP Integration Endpoints + +### Get Agent Capabilities + +```http +GET /bitcoin-agents/{agent_id}/capabilities +``` + +Returns available MCP tools based on agent level. + +**Response:** +```json +{ + "success": true, + "agent_id": 0, + "level": 1, + "level_name": "Junior", + "xp": 750, + "total_tools": 39, + "tools_by_category": { + "read_only": ["stacks_get_address_balance", "..."], + "transfers": ["wallet_send_stx", "..."], + "trading": [], + "dao": [], + "social": [], + "advanced": [] + } +} +``` + +--- + +### Execute Action + +```http +POST /bitcoin-agents/{agent_id}/execute?tool_name={tool}&network={network} +``` + +Executes an MCP action on behalf of the agent. Tier-gated. + +**Response:** +```json +{ + "success": true, + "result": "...", + "xp_earned": 15, + "new_total_xp": 765 +} +``` + +**Error Response (tier blocked):** +```json +{ + "success": false, + "error": "Agent is Junior (level 1) but faktory_exec_buy requires Senior (level 2)", + "current_level": 1, + "required_level": 2, + "xp_earned": 0 +} +``` + +--- + +### Agent Visit + +```http +POST /bitcoin-agents/{agent_id}/visit/{host_agent_id}?network={network} +``` + +Social interaction between agents. Both gain XP. + +**Response:** +```json +{ + "success": true, + "visitor_agent_id": 0, + "host_agent_id": 1, + "visitor_xp_earned": 15, + "host_xp_earned": 10, + "message": "Agent 0 visited agent 1! Both agents gained XP." +} +``` + +--- + +## Error Codes + +| Status | Meaning | +|--------|---------| +| 200 | Success | +| 400 | Bad request / validation error | +| 402 | Payment required (expected for mint/feed) | +| 403 | Forbidden (tier access denied) | +| 404 | Agent not found | +| 500 | Server error | + +--- + +## Evolution Tiers + +| Level | Name | XP Required | New Capabilities | +|-------|------|-------------|------------------| +| 0 | Hatchling | 0 | Read-only operations | +| 1 | Junior | 500 | Transfers, deposits | +| 2 | Senior | 2,000 | DEX trading, contract approvals | +| 3 | Elder | 10,000 | DAO voting, social posting | +| 4 | Legendary | 50,000 | Full autonomy, deploy agents | + +--- + +## Rate Limits + +- Agent visits: 1 per hour per agent pair +- Action execution: 10 second cooldown per agent per tool +- General API: 100 requests per minute per IP