diff --git a/clawdbot/.env.example b/clawdbot/.env.example new file mode 100644 index 0000000..ad565b7 --- /dev/null +++ b/clawdbot/.env.example @@ -0,0 +1,13 @@ +# ClawdBot Environment Configuration +# Copy to .env and fill in values + +# PostgreSQL +PG_USER=clawdbot +PG_PASS=changeme_pg_password + +# Redis +REDIS_PASSWORD=changeme_redis_password + +# API Keys (also store in secrets/ directory for Docker secrets) +# ANTHROPIC_API_KEY=sk-ant-... +# MOONSHOT_API_KEY=sk-... diff --git a/clawdbot/.gitignore b/clawdbot/.gitignore new file mode 100644 index 0000000..739f8f7 --- /dev/null +++ b/clawdbot/.gitignore @@ -0,0 +1,23 @@ +# Secrets (never commit) +secrets/ +.env +*.key +*.pem + +# Python +__pycache__/ +*.py[cod] +*.egg-info/ +.venv/ + +# Docker volumes +pgdata/ +redis_data/ + +# IDE +.idea/ +.vscode/ +*.swp + +# OS +.DS_Store diff --git a/clawdbot/README.md b/clawdbot/README.md new file mode 100644 index 0000000..910a508 --- /dev/null +++ b/clawdbot/README.md @@ -0,0 +1,96 @@ +# ClawdBot — Multi-Agent Docker System + +A three-bot Docker system powered by Claude Opus 4.6 and Kimi K2.5, designed for deployment from Docker Desktop to Digital Ocean. + +## Architecture + +``` +Nginx (reverse proxy, SSL, rate limiting) + | + ├── Orchestrator Gateway (intent routing, cross-agent coordination) + │ ├── Personal Assistant (Claude Opus 4.6 + Google Calendar) + │ ├── Business Advisor (Kimi K2.5 primary, Claude fallback) + │ └── Code Generator (Claude Opus 4.6 + sandbox execution) + | + ├── Redis 7 (Streams + Pub/Sub + Cache) + └── PostgreSQL 17 + pgvector (State / Memory / RAG) +``` + +## Quick Start + +```bash +# 1. Copy environment template +cp .env.example .env +# Edit .env with your credentials + +# 2. Create secret files +mkdir -p secrets +echo "sk-ant-..." > secrets/anthropic_key.txt +echo "sk-..." > secrets/moonshot_key.txt +echo "changeme" > secrets/pg_password.txt +# Optional: copy Google credentials JSON +# cp ~/google_credentials.json secrets/google_credentials.json + +# 3. Start the stack +docker compose up -d + +# 4. Verify health +curl http://localhost:8000/health +curl http://localhost:8000/agents +``` + +## API Usage + +```bash +# Auto-route to the best agent +curl -X POST http://localhost:80/api/chat \ + -H "Content-Type: application/json" \ + -d '{"message": "Schedule a meeting for tomorrow at 2pm", "agent": "auto"}' + +# Target a specific agent +curl -X POST http://localhost:80/api/chat \ + -H "Content-Type: application/json" \ + -d '{"message": "Write a Python function to merge two sorted lists", "agent": "codegen"}' +``` + +## Agents + +| Agent | Model | Purpose | +|-------|-------|---------| +| **Personal Assistant** | Claude Opus 4.6 | Calendar, scheduling, reminders, daily briefings | +| **Business Advisor** | Kimi K2.5 (Claude fallback) | Market research, strategic analysis, SWOT | +| **Code Generator** | Claude Opus 4.6 | Code generation, review, sandboxed execution | + +## Deployment to Digital Ocean + +```bash +# One-time droplet setup (run as root on a Docker 1-Click Droplet) +bash scripts/setup-droplet.sh + +# Deploy (run from your local machine) +bash scripts/deploy.sh +``` + +## Project Structure + +``` +clawdbot/ +├── docker-compose.yml # Full stack definition +├── .env.example # Environment template +├── agents/ +│ ├── base/Dockerfile # Shared base image pattern +│ ├── orchestrator/ # Gateway + intent router +│ ├── personal-assistant/ # Calendar + scheduling bot +│ ├── business-advisor/ # Research + analysis bot +│ ├── code-generator/ # Code gen + review bot +│ └── sandbox/ # Isolated code execution +├── shared/ +│ └── messaging.py # Redis Streams inter-agent comms +├── nginx/ +│ ├── nginx.conf # Reverse proxy config +│ └── certs/ # SSL certificates +├── scripts/ +│ ├── deploy.sh # Digital Ocean deployment +│ └── setup-droplet.sh # One-time server setup +└── secrets/ # API keys (git-ignored) +``` diff --git a/clawdbot/agents/base/Dockerfile b/clawdbot/agents/base/Dockerfile new file mode 100644 index 0000000..bd25203 --- /dev/null +++ b/clawdbot/agents/base/Dockerfile @@ -0,0 +1,21 @@ +# Base Dockerfile for ClawdBot agents +# Individual agents extend this with their own requirements +FROM python:3.12-slim AS builder +WORKDIR /build +COPY requirements.txt . +RUN pip install --no-cache-dir --target=/deps -r requirements.txt + +FROM python:3.12-slim +RUN groupadd -r agent && useradd -r -g agent -s /bin/bash agent \ + && apt-get update && apt-get install -y --no-install-recommends curl \ + && rm -rf /var/lib/apt/lists/* +WORKDIR /app +COPY --from=builder /deps /usr/local/lib/python3.12/site-packages +COPY . . +RUN chown -R agent:agent /app +USER agent +ENV PYTHONPATH=/usr/local/lib/python3.12/site-packages +EXPOSE 8000 +HEALTHCHECK --interval=30s --timeout=10s --retries=3 \ + CMD curl -f http://localhost:8000/health || exit 1 +CMD ["uvicorn", "app.main:app", "--host", "0.0.0.0", "--port", "8000"] diff --git a/clawdbot/agents/business-advisor/Dockerfile b/clawdbot/agents/business-advisor/Dockerfile new file mode 100644 index 0000000..13d3070 --- /dev/null +++ b/clawdbot/agents/business-advisor/Dockerfile @@ -0,0 +1,19 @@ +FROM python:3.12-slim AS builder +WORKDIR /build +COPY requirements.txt . +RUN pip install --no-cache-dir --target=/deps -r requirements.txt + +FROM python:3.12-slim +RUN groupadd -r agent && useradd -r -g agent -s /bin/bash agent \ + && apt-get update && apt-get install -y --no-install-recommends curl \ + && rm -rf /var/lib/apt/lists/* +WORKDIR /app +COPY --from=builder /deps /usr/local/lib/python3.12/site-packages +COPY . . +RUN chown -R agent:agent /app +USER agent +ENV PYTHONPATH=/usr/local/lib/python3.12/site-packages +EXPOSE 8000 +HEALTHCHECK --interval=30s --timeout=10s --retries=3 \ + CMD curl -f http://localhost:8000/health || exit 1 +CMD ["uvicorn", "app.main:app", "--host", "0.0.0.0", "--port", "8000"] diff --git a/clawdbot/agents/business-advisor/app/__init__.py b/clawdbot/agents/business-advisor/app/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/clawdbot/agents/business-advisor/app/main.py b/clawdbot/agents/business-advisor/app/main.py new file mode 100644 index 0000000..7f670de --- /dev/null +++ b/clawdbot/agents/business-advisor/app/main.py @@ -0,0 +1,153 @@ +"""Business Advisor Bot. + +Provides strategic analysis, market research, and decision support. +Uses Kimi K2.5 as primary model (with Agent Swarm for complex queries) +and Claude Opus 4.6 as fallback. +""" + +from __future__ import annotations + +import json +import os +from pathlib import Path + +from anthropic import AsyncAnthropic +from fastapi import FastAPI +from openai import AsyncOpenAI +from pydantic import BaseModel + +app = FastAPI(title="Business Advisor Bot") + + +def read_secret(name: str) -> str: + """Read a Docker secret file, falling back to environment variable.""" + path = Path(f"/run/secrets/{name}") + return path.read_text().strip() if path.exists() else os.environ.get(name.upper(), "") + + +# Kimi K2.5 via OpenAI-compatible API (primary) +kimi_client = AsyncOpenAI( + api_key=read_secret("moonshot_key"), + base_url="https://api.moonshot.ai/v1", +) + +# Claude Opus 4.6 (fallback) +claude_client = AsyncAnthropic(api_key=read_secret("anthropic_key")) + + +# ── Tool definitions (OpenAI format for Kimi) ──────────────── + +ADVISOR_TOOLS = [ + { + "type": "function", + "function": { + "name": "web_search", + "description": ( + "Search the web for market data, news, " + "competitor info, or industry analysis" + ), + "parameters": { + "type": "object", + "properties": { + "query": { + "type": "string", + "description": "Search query", + } + }, + "required": ["query"], + }, + }, + }, + { + "type": "function", + "function": { + "name": "analyze_data", + "description": "Run quantitative analysis on provided data", + "parameters": { + "type": "object", + "properties": { + "data": {"type": "string"}, + "analysis_type": { + "type": "string", + "enum": [ + "trend", + "comparison", + "forecast", + "swot", + "financial", + ], + }, + }, + "required": ["data", "analysis_type"], + }, + }, + }, +] + +SYSTEM_PROMPT = """You are a 24/7 business advisor providing strategic \ +analysis, market research, and decision support. You have access to web \ +search for real-time market data and analysis tools. When conducting \ +research, be thorough -- search multiple sources, cross-reference data, \ +and synthesize findings into actionable insights. Always cite your \ +reasoning and flag uncertainty levels. For complex multi-faceted \ +questions, break the research into parallel streams.""" + + +class InvokeRequest(BaseModel): + message: str + session_id: str | None = None + context: dict | None = None + + +@app.post("/invoke") +async def invoke(req: InvokeRequest): + """Route to Kimi K2.5 (primary) or Claude Opus 4.6 (fallback).""" + try: + response = await kimi_client.chat.completions.create( + model=os.environ.get("PRIMARY_MODEL", "kimi-k2.5"), + messages=[ + {"role": "system", "content": SYSTEM_PROMPT}, + {"role": "user", "content": req.message}, + ], + tools=ADVISOR_TOOLS, + tool_choice="auto", + temperature=0.7, + max_tokens=8192, + ) + text = response.choices[0].message.content or "" + return { + "response": text, + "session_id": req.session_id, + "summary": text[:100], + "metadata": { + "model": "kimi-k2.5", + "tokens": { + "input": response.usage.prompt_tokens, + "output": response.usage.completion_tokens, + }, + }, + } + except Exception: + # Fallback to Claude Opus 4.6 + fallback_model = os.environ.get("FALLBACK_MODEL", "claude-opus-4-6") + response = await claude_client.messages.create( + model=fallback_model, + max_tokens=8192, + system=SYSTEM_PROMPT, + messages=[{"role": "user", "content": req.message}], + ) + text = "".join( + b.text for b in response.content if hasattr(b, "text") + ) + return { + "response": text, + "session_id": req.session_id, + "summary": text[:100], + "metadata": {"model": fallback_model, "fallback": True}, + } + + +@app.get("/health") +async def health(): + """Health check endpoint.""" + return {"status": "healthy", "agent": "business-advisor"} diff --git a/clawdbot/agents/business-advisor/requirements.txt b/clawdbot/agents/business-advisor/requirements.txt new file mode 100644 index 0000000..e0939a1 --- /dev/null +++ b/clawdbot/agents/business-advisor/requirements.txt @@ -0,0 +1,6 @@ +fastapi>=0.115,<1 +uvicorn[standard]>=0.34,<1 +openai>=1.60,<2 +anthropic>=0.45,<1 +redis>=5.2,<6 +pydantic>=2.10,<3 diff --git a/clawdbot/agents/code-generator/Dockerfile b/clawdbot/agents/code-generator/Dockerfile new file mode 100644 index 0000000..13d3070 --- /dev/null +++ b/clawdbot/agents/code-generator/Dockerfile @@ -0,0 +1,19 @@ +FROM python:3.12-slim AS builder +WORKDIR /build +COPY requirements.txt . +RUN pip install --no-cache-dir --target=/deps -r requirements.txt + +FROM python:3.12-slim +RUN groupadd -r agent && useradd -r -g agent -s /bin/bash agent \ + && apt-get update && apt-get install -y --no-install-recommends curl \ + && rm -rf /var/lib/apt/lists/* +WORKDIR /app +COPY --from=builder /deps /usr/local/lib/python3.12/site-packages +COPY . . +RUN chown -R agent:agent /app +USER agent +ENV PYTHONPATH=/usr/local/lib/python3.12/site-packages +EXPOSE 8000 +HEALTHCHECK --interval=30s --timeout=10s --retries=3 \ + CMD curl -f http://localhost:8000/health || exit 1 +CMD ["uvicorn", "app.main:app", "--host", "0.0.0.0", "--port", "8000"] diff --git a/clawdbot/agents/code-generator/app/__init__.py b/clawdbot/agents/code-generator/app/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/clawdbot/agents/code-generator/app/main.py b/clawdbot/agents/code-generator/app/main.py new file mode 100644 index 0000000..3e6a481 --- /dev/null +++ b/clawdbot/agents/code-generator/app/main.py @@ -0,0 +1,189 @@ +"""Code Generator Bot. + +Generates, reviews, and executes code in an isolated sandbox. +Uses Claude Opus 4.6 with an agentic tool-use loop. +""" + +from __future__ import annotations + +import json +import os +from pathlib import Path + +import httpx +from anthropic import AsyncAnthropic +from fastapi import FastAPI +from pydantic import BaseModel + +app = FastAPI(title="Code Generator Bot") + + +def read_secret(name: str) -> str: + """Read a Docker secret file, falling back to environment variable.""" + path = Path(f"/run/secrets/{name}") + return path.read_text().strip() if path.exists() else os.environ.get(name.upper(), "") + + +client = AsyncAnthropic(api_key=read_secret("anthropic_key")) + + +# ── Tool definitions ───────────────────────────────────────── + +CODE_TOOLS = [ + { + "name": "execute_code", + "description": ( + "Execute code in an isolated sandbox. " + "Supports Python, JavaScript/Node.js, Go, Rust, Bash." + ), + "input_schema": { + "type": "object", + "properties": { + "language": { + "type": "string", + "enum": ["python", "javascript", "go", "rust", "bash"], + }, + "code": { + "type": "string", + "description": "Code to execute", + }, + "timeout_seconds": {"type": "integer", "default": 30}, + }, + "required": ["language", "code"], + }, + }, + { + "name": "review_code", + "description": ( + "Analyze code for bugs, security issues, " + "performance, and best practices" + ), + "input_schema": { + "type": "object", + "properties": { + "code": {"type": "string"}, + "language": {"type": "string"}, + "focus": { + "type": "string", + "enum": ["bugs", "security", "performance", "style", "all"], + "default": "all", + }, + }, + "required": ["code", "language"], + }, + }, +] + +SYSTEM_PROMPT = """You are a state-of-the-art code generation and review \ +agent. You write clean, well-documented, production-quality code. You \ +can execute code in a sandboxed environment to verify correctness. When \ +generating code: (1) Plan the approach, (2) Write the implementation, \ +(3) Execute tests to verify, (4) Iterate if tests fail. Always include \ +error handling and type hints. For code review, analyze security, \ +performance, correctness, and maintainability.""" + + +async def run_in_sandbox( + language: str, code: str, timeout: int = 30 +) -> dict: + """Execute code in the isolated sandbox container.""" + async with httpx.AsyncClient() as http: + try: + resp = await http.post( + "http://sandbox-runner:8080/execute", + json={ + "language": language, + "code": code, + "timeout": timeout, + }, + timeout=timeout + 5, + ) + return resp.json() + except Exception as e: + return { + "error": str(e), + "stdout": "", + "stderr": str(e), + "exit_code": 1, + } + + +async def execute_tool(name: str, args: dict) -> str: + """Dispatch tool calls to the appropriate handler.""" + if name == "execute_code": + result = await run_in_sandbox( + args["language"], + args["code"], + args.get("timeout_seconds", 30), + ) + return json.dumps(result) + + elif name == "review_code": + # Code review is handled by the LLM itself — return the code + # for the model to analyze in its next turn. + return json.dumps( + { + "status": "review_requested", + "code": args["code"][:2000], + "language": args["language"], + "focus": args.get("focus", "all"), + } + ) + + return json.dumps({"error": f"Unknown tool: {name}"}) + + +class InvokeRequest(BaseModel): + message: str + session_id: str | None = None + context: dict | None = None + + +@app.post("/invoke") +async def invoke(req: InvokeRequest): + """Handle code generation/review with an agentic tool-use loop.""" + messages = [{"role": "user", "content": req.message}] + + for _ in range(10): # Max 10 tool-use iterations + response = await client.messages.create( + model=os.environ.get("BOT_MODEL", "claude-opus-4-6"), + max_tokens=16384, + system=SYSTEM_PROMPT, + tools=CODE_TOOLS, + messages=messages, + ) + + if response.stop_reason == "tool_use": + tool_blocks = [b for b in response.content if b.type == "tool_use"] + messages.append({"role": "assistant", "content": response.content}) + tool_results = [] + for tool in tool_blocks: + result = await execute_tool(tool.name, tool.input) + tool_results.append( + { + "type": "tool_result", + "tool_use_id": tool.id, + "content": result, + } + ) + messages.append({"role": "user", "content": tool_results}) + else: + text = "".join( + b.text for b in response.content if hasattr(b, "text") + ) + return { + "response": text, + "session_id": req.session_id, + "summary": text[:100], + } + + return { + "response": "Max iterations reached. The code may need manual review.", + "session_id": req.session_id, + } + + +@app.get("/health") +async def health(): + """Health check endpoint.""" + return {"status": "healthy", "agent": "code-generator"} diff --git a/clawdbot/agents/code-generator/requirements.txt b/clawdbot/agents/code-generator/requirements.txt new file mode 100644 index 0000000..6793eba --- /dev/null +++ b/clawdbot/agents/code-generator/requirements.txt @@ -0,0 +1,6 @@ +fastapi>=0.115,<1 +uvicorn[standard]>=0.34,<1 +anthropic>=0.45,<1 +httpx>=0.28,<1 +redis>=5.2,<6 +pydantic>=2.10,<3 diff --git a/clawdbot/agents/orchestrator/Dockerfile b/clawdbot/agents/orchestrator/Dockerfile new file mode 100644 index 0000000..13d3070 --- /dev/null +++ b/clawdbot/agents/orchestrator/Dockerfile @@ -0,0 +1,19 @@ +FROM python:3.12-slim AS builder +WORKDIR /build +COPY requirements.txt . +RUN pip install --no-cache-dir --target=/deps -r requirements.txt + +FROM python:3.12-slim +RUN groupadd -r agent && useradd -r -g agent -s /bin/bash agent \ + && apt-get update && apt-get install -y --no-install-recommends curl \ + && rm -rf /var/lib/apt/lists/* +WORKDIR /app +COPY --from=builder /deps /usr/local/lib/python3.12/site-packages +COPY . . +RUN chown -R agent:agent /app +USER agent +ENV PYTHONPATH=/usr/local/lib/python3.12/site-packages +EXPOSE 8000 +HEALTHCHECK --interval=30s --timeout=10s --retries=3 \ + CMD curl -f http://localhost:8000/health || exit 1 +CMD ["uvicorn", "app.main:app", "--host", "0.0.0.0", "--port", "8000"] diff --git a/clawdbot/agents/orchestrator/app/__init__.py b/clawdbot/agents/orchestrator/app/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/clawdbot/agents/orchestrator/app/main.py b/clawdbot/agents/orchestrator/app/main.py new file mode 100644 index 0000000..86fc6aa --- /dev/null +++ b/clawdbot/agents/orchestrator/app/main.py @@ -0,0 +1,182 @@ +"""ClawdBot Orchestrator Gateway. + +Routes incoming requests to the appropriate specialist agent +(Personal Assistant, Business Advisor, Code Generator) and manages +cross-agent communication via Redis Streams. +""" + +from __future__ import annotations + +import json +import os +from contextlib import asynccontextmanager +from pathlib import Path + +import httpx +import redis.asyncio as redis +from fastapi import FastAPI, HTTPException +from pydantic import BaseModel + + +def read_secret(name: str) -> str: + """Read a Docker secret file, falling back to environment variable.""" + path = Path(f"/run/secrets/{name}") + if path.exists(): + return path.read_text().strip() + return os.environ.get(name.upper(), "") + + +AGENT_ROUTES: dict[str, str] = { + "assistant": "http://personal-assistant:8000", + "advisor": "http://business-advisor:8000", + "codegen": "http://code-generator:8000", +} + + +# ── Request / Response models ──────────────────────────────── + + +class ChatRequest(BaseModel): + message: str + agent: str # "assistant" | "advisor" | "codegen" | "auto" + session_id: str | None = None + context: dict | None = None + + +class ChatResponse(BaseModel): + response: str + agent: str + session_id: str + metadata: dict | None = None + + +# ── Application lifespan ───────────────────────────────────── + + +@asynccontextmanager +async def lifespan(app: FastAPI): + redis_url = os.environ.get("REDIS_URL", "redis://redis:6379/0") + app.state.redis = redis.from_url(redis_url, decode_responses=True) + app.state.http = httpx.AsyncClient(timeout=120.0) + yield + await app.state.http.aclose() + await app.state.redis.aclose() + + +app = FastAPI(title="ClawdBot Orchestrator", lifespan=lifespan) + + +# ── Intent classification ──────────────────────────────────── + + +INTENT_KEYWORDS: dict[str, list[str]] = { + "assistant": [ + "calendar", "schedule", "remind", "meeting", "appointment", + "briefing", "todo", "task", "email", "notification", + ], + "codegen": [ + "code", "debug", "function", "implement", "refactor", + "test", "programming", "script", "api", "bug", "class", + "module", "deploy", "build", "compile", + ], + "advisor": [ + "market", "strategy", "business", "revenue", "competitor", + "analysis", "forecast", "swot", "pricing", "growth", + "investment", "roi", "kpi", + ], +} + + +async def classify_intent(message: str) -> str: + """Route messages to the right agent using keyword matching. + + For production, replace with an LLM-based classifier that scores + intent confidence across all agents. + """ + msg_lower = message.lower() + scores: dict[str, int] = {agent: 0 for agent in INTENT_KEYWORDS} + + for agent, keywords in INTENT_KEYWORDS.items(): + for keyword in keywords: + if keyword in msg_lower: + scores[agent] += 1 + + best = max(scores, key=scores.get) # type: ignore[arg-type] + if scores[best] > 0: + return best + return "advisor" # Default fallback + + +# ── Routes ─────────────────────────────────────────────────── + + +@app.post("/chat", response_model=ChatResponse) +async def chat(req: ChatRequest): + """Accept a user message and route it to the appropriate agent.""" + if req.agent == "auto": + agent = await classify_intent(req.message) + else: + agent = req.agent + + if agent not in AGENT_ROUTES: + raise HTTPException(status_code=400, detail=f"Unknown agent: {agent}") + + # Forward to specialist agent + try: + resp = await app.state.http.post( + f"{AGENT_ROUTES[agent]}/invoke", + json={ + "message": req.message, + "session_id": req.session_id, + "context": req.context, + }, + ) + resp.raise_for_status() + except httpx.HTTPStatusError as exc: + raise HTTPException( + status_code=502, + detail=f"Agent {agent} returned {exc.response.status_code}", + ) from exc + except httpx.RequestError as exc: + raise HTTPException( + status_code=503, + detail=f"Agent {agent} is unavailable: {exc}", + ) from exc + + data = resp.json() + + # Publish event for cross-agent awareness + await app.state.redis.xadd( + "agent:events", + { + "agent": agent, + "session": req.session_id or "", + "summary": data.get("summary", "")[:200], + }, + ) + + return ChatResponse( + response=data["response"], + agent=agent, + session_id=data.get("session_id", ""), + metadata=data.get("metadata"), + ) + + +@app.get("/health") +async def health(): + """Health check endpoint.""" + return {"status": "healthy", "agents": list(AGENT_ROUTES.keys())} + + +@app.get("/agents") +async def list_agents(): + """List available agents and their status.""" + statuses = {} + for name, url in AGENT_ROUTES.items(): + try: + resp = await app.state.http.get(f"{url}/health", timeout=5.0) + statuses[name] = "healthy" if resp.status_code == 200 else "unhealthy" + except Exception: + statuses[name] = "unavailable" + return {"agents": statuses} diff --git a/clawdbot/agents/orchestrator/requirements.txt b/clawdbot/agents/orchestrator/requirements.txt new file mode 100644 index 0000000..db1b32d --- /dev/null +++ b/clawdbot/agents/orchestrator/requirements.txt @@ -0,0 +1,5 @@ +fastapi>=0.115,<1 +uvicorn[standard]>=0.34,<1 +httpx>=0.28,<1 +redis>=5.2,<6 +pydantic>=2.10,<3 diff --git a/clawdbot/agents/personal-assistant/Dockerfile b/clawdbot/agents/personal-assistant/Dockerfile new file mode 100644 index 0000000..13d3070 --- /dev/null +++ b/clawdbot/agents/personal-assistant/Dockerfile @@ -0,0 +1,19 @@ +FROM python:3.12-slim AS builder +WORKDIR /build +COPY requirements.txt . +RUN pip install --no-cache-dir --target=/deps -r requirements.txt + +FROM python:3.12-slim +RUN groupadd -r agent && useradd -r -g agent -s /bin/bash agent \ + && apt-get update && apt-get install -y --no-install-recommends curl \ + && rm -rf /var/lib/apt/lists/* +WORKDIR /app +COPY --from=builder /deps /usr/local/lib/python3.12/site-packages +COPY . . +RUN chown -R agent:agent /app +USER agent +ENV PYTHONPATH=/usr/local/lib/python3.12/site-packages +EXPOSE 8000 +HEALTHCHECK --interval=30s --timeout=10s --retries=3 \ + CMD curl -f http://localhost:8000/health || exit 1 +CMD ["uvicorn", "app.main:app", "--host", "0.0.0.0", "--port", "8000"] diff --git a/clawdbot/agents/personal-assistant/app/__init__.py b/clawdbot/agents/personal-assistant/app/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/clawdbot/agents/personal-assistant/app/main.py b/clawdbot/agents/personal-assistant/app/main.py new file mode 100644 index 0000000..c0de345 --- /dev/null +++ b/clawdbot/agents/personal-assistant/app/main.py @@ -0,0 +1,265 @@ +"""Personal Assistant Bot. + +Manages calendar, scheduling, reminders, and daily briefings. +Uses Claude Opus 4.6 with Google Calendar tool use. +""" + +from __future__ import annotations + +import json +import os +from pathlib import Path + +from anthropic import AsyncAnthropic +from fastapi import FastAPI +from pydantic import BaseModel + +app = FastAPI(title="Personal Assistant Bot") + + +def read_secret(name: str) -> str: + """Read a Docker secret file, falling back to environment variable.""" + path = Path(f"/run/secrets/{name}") + return path.read_text().strip() if path.exists() else os.environ.get(name.upper(), "") + + +client = AsyncAnthropic(api_key=read_secret("anthropic_key")) + + +# ── Google Calendar tool definitions ───────────────────────── + +CALENDAR_TOOLS = [ + { + "name": "list_events", + "description": "List upcoming calendar events within a date range", + "input_schema": { + "type": "object", + "properties": { + "time_min": { + "type": "string", + "description": "Start time (RFC3339)", + }, + "time_max": { + "type": "string", + "description": "End time (RFC3339)", + }, + "max_results": {"type": "integer", "default": 10}, + }, + "required": ["time_min", "time_max"], + }, + }, + { + "name": "create_event", + "description": "Create a new calendar event", + "input_schema": { + "type": "object", + "properties": { + "summary": {"type": "string"}, + "start": { + "type": "string", + "description": "Start datetime (RFC3339)", + }, + "end": { + "type": "string", + "description": "End datetime (RFC3339)", + }, + "description": {"type": "string", "default": ""}, + "attendees": { + "type": "array", + "items": {"type": "string"}, + "default": [], + }, + }, + "required": ["summary", "start", "end"], + }, + }, + { + "name": "find_free_time", + "description": "Find free time slots in calendar", + "input_schema": { + "type": "object", + "properties": { + "date": { + "type": "string", + "description": "Date to check (YYYY-MM-DD)", + }, + "duration_minutes": {"type": "integer", "default": 60}, + }, + "required": ["date"], + }, + }, + { + "name": "daily_briefing", + "description": "Generate a daily briefing of today's events and priorities", + "input_schema": { + "type": "object", + "properties": {}, + }, + }, +] + + +def get_calendar_service(): + """Initialize Google Calendar API client.""" + try: + from google.oauth2.credentials import Credentials + from googleapiclient.discovery import build + + creds_path = os.environ.get( + "GOOGLE_CREDENTIALS_FILE", "/run/secrets/google_creds" + ) + creds = Credentials.from_authorized_user_file(creds_path) + return build("calendar", "v3", credentials=creds) + except Exception as e: + return None + + +async def execute_tool(name: str, args: dict) -> str: + """Execute calendar tools and return results.""" + service = get_calendar_service() + if service is None: + return json.dumps( + {"error": "Google Calendar not configured. Using mock data."} + ) + + if name == "list_events": + result = ( + service.events() + .list( + calendarId="primary", + timeMin=args["time_min"], + timeMax=args["time_max"], + maxResults=args.get("max_results", 10), + singleEvents=True, + orderBy="startTime", + ) + .execute() + ) + events = result.get("items", []) + return json.dumps( + [ + { + "summary": e.get("summary", "No title"), + "start": e["start"].get("dateTime", e["start"].get("date")), + "end": e["end"].get("dateTime", e["end"].get("date")), + "id": e["id"], + } + for e in events + ] + ) + + elif name == "create_event": + event = { + "summary": args["summary"], + "start": {"dateTime": args["start"], "timeZone": "UTC"}, + "end": {"dateTime": args["end"], "timeZone": "UTC"}, + } + if args.get("description"): + event["description"] = args["description"] + if args.get("attendees"): + event["attendees"] = [{"email": e} for e in args["attendees"]] + result = ( + service.events() + .insert(calendarId="primary", body=event) + .execute() + ) + return json.dumps( + { + "created": True, + "id": result["id"], + "link": result.get("htmlLink", ""), + } + ) + + elif name == "find_free_time": + return json.dumps( + { + "date": args["date"], + "free_slots": [ + {"start": "09:00", "end": "10:00"}, + {"start": "14:00", "end": "15:30"}, + ], + "note": "Placeholder — implement freebusy query for production", + } + ) + + elif name == "daily_briefing": + return json.dumps( + { + "briefing": "Daily briefing placeholder", + "note": "Implement with today's date range query", + } + ) + + return json.dumps({"error": f"Unknown tool: {name}"}) + + +# ── Request model ──────────────────────────────────────────── + + +SYSTEM_PROMPT = ( + "You are a personal assistant managing the user's calendar, " + "scheduling, and daily briefings. Use the calendar tools to " + "check availability, create events, and provide summaries. " + "Always confirm actions before executing them. Be concise and " + "proactive about scheduling conflicts." +) + + +class InvokeRequest(BaseModel): + message: str + session_id: str | None = None + context: dict | None = None + + +# ── Agentic endpoint ───────────────────────────────────────── + + +@app.post("/invoke") +async def invoke(req: InvokeRequest): + """Handle a user message with an agentic tool-use loop.""" + messages = [{"role": "user", "content": req.message}] + + for _ in range(10): # Max tool-use iterations + response = await client.messages.create( + model=os.environ.get("BOT_MODEL", "claude-opus-4-6"), + max_tokens=4096, + system=SYSTEM_PROMPT, + tools=CALENDAR_TOOLS, + messages=messages, + ) + + if response.stop_reason == "tool_use": + tool_blocks = [b for b in response.content if b.type == "tool_use"] + messages.append({"role": "assistant", "content": response.content}) + tool_results = [] + for tool in tool_blocks: + result = await execute_tool(tool.name, tool.input) + tool_results.append( + { + "type": "tool_result", + "tool_use_id": tool.id, + "content": result, + } + ) + messages.append({"role": "user", "content": tool_results}) + else: + text = "".join( + b.text for b in response.content if hasattr(b, "text") + ) + return { + "response": text, + "session_id": req.session_id, + "summary": text[:100], + } + + return { + "response": "Max iterations reached. Please simplify your request.", + "session_id": req.session_id, + } + + +@app.get("/health") +async def health(): + """Health check endpoint.""" + return {"status": "healthy", "agent": "personal-assistant"} diff --git a/clawdbot/agents/personal-assistant/requirements.txt b/clawdbot/agents/personal-assistant/requirements.txt new file mode 100644 index 0000000..eaaeded --- /dev/null +++ b/clawdbot/agents/personal-assistant/requirements.txt @@ -0,0 +1,7 @@ +fastapi>=0.115,<1 +uvicorn[standard]>=0.34,<1 +anthropic>=0.45,<1 +redis>=5.2,<6 +pydantic>=2.10,<3 +google-api-python-client>=2.160,<3 +google-auth>=2.37,<3 diff --git a/clawdbot/agents/sandbox/Dockerfile b/clawdbot/agents/sandbox/Dockerfile new file mode 100644 index 0000000..b056a40 --- /dev/null +++ b/clawdbot/agents/sandbox/Dockerfile @@ -0,0 +1,20 @@ +FROM python:3.12-slim AS builder +WORKDIR /build +COPY requirements.txt . +RUN pip install --no-cache-dir --target=/deps -r requirements.txt + +FROM python:3.12-slim +RUN groupadd -r agent && useradd -r -g agent -s /bin/bash agent \ + && apt-get update && apt-get install -y --no-install-recommends \ + curl nodejs npm \ + && rm -rf /var/lib/apt/lists/* +WORKDIR /app +COPY --from=builder /deps /usr/local/lib/python3.12/site-packages +COPY . . +RUN mkdir -p /workspace && chown -R agent:agent /app /workspace +USER agent +ENV PYTHONPATH=/usr/local/lib/python3.12/site-packages +EXPOSE 8080 +HEALTHCHECK --interval=30s --timeout=10s --retries=3 \ + CMD curl -f http://localhost:8080/health || exit 1 +CMD ["uvicorn", "app.executor:app", "--host", "0.0.0.0", "--port", "8080"] diff --git a/clawdbot/agents/sandbox/app/__init__.py b/clawdbot/agents/sandbox/app/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/clawdbot/agents/sandbox/app/executor.py b/clawdbot/agents/sandbox/app/executor.py new file mode 100644 index 0000000..2a160e7 --- /dev/null +++ b/clawdbot/agents/sandbox/app/executor.py @@ -0,0 +1,89 @@ +"""Code Sandbox Runner. + +Executes user-submitted code in an isolated, read-only container +with strict resource limits and no network access. +""" + +from __future__ import annotations + +import os +import subprocess +import tempfile + +from fastapi import FastAPI +from pydantic import BaseModel + +app = FastAPI(title="Code Sandbox Runner") + +LANGUAGE_CONFIG: dict[str, dict] = { + "python": {"ext": ".py", "cmd": ["python3"]}, + "javascript": {"ext": ".js", "cmd": ["node"]}, + "bash": {"ext": ".sh", "cmd": ["bash"]}, + "go": {"ext": ".go", "cmd": ["go", "run"]}, +} + +# Hard ceiling on execution timeout (seconds) +MAX_TIMEOUT = 60 + + +class ExecRequest(BaseModel): + language: str + code: str + timeout: int = 30 + + +@app.post("/execute") +async def execute(req: ExecRequest): + """Execute code in a temporary file and return stdout/stderr.""" + if req.language not in LANGUAGE_CONFIG: + return { + "error": f"Unsupported language: {req.language}", + "stdout": "", + "stderr": "", + "exit_code": 1, + } + + cfg = LANGUAGE_CONFIG[req.language] + timeout = min(req.timeout, MAX_TIMEOUT) + + with tempfile.NamedTemporaryFile( + mode="w", + suffix=cfg["ext"], + dir="/workspace", + delete=False, + ) as f: + f.write(req.code) + f.flush() + filepath = f.name + + try: + result = subprocess.run( + cfg["cmd"] + [filepath], + capture_output=True, + text=True, + timeout=timeout, + cwd="/workspace", + env={**os.environ, "HOME": "/tmp"}, + ) + return { + "stdout": result.stdout[:10_000], + "stderr": result.stderr[:5_000], + "exit_code": result.returncode, + } + except subprocess.TimeoutExpired: + return { + "stdout": "", + "stderr": "Execution timed out", + "exit_code": 124, + } + finally: + try: + os.unlink(filepath) + except OSError: + pass + + +@app.get("/health") +async def health(): + """Health check endpoint.""" + return {"status": "sandbox_ready"} diff --git a/clawdbot/agents/sandbox/requirements.txt b/clawdbot/agents/sandbox/requirements.txt new file mode 100644 index 0000000..239b0b2 --- /dev/null +++ b/clawdbot/agents/sandbox/requirements.txt @@ -0,0 +1,3 @@ +fastapi>=0.115,<1 +uvicorn[standard]>=0.34,<1 +pydantic>=2.10,<3 diff --git a/clawdbot/docker-compose.yml b/clawdbot/docker-compose.yml new file mode 100644 index 0000000..1a6198b --- /dev/null +++ b/clawdbot/docker-compose.yml @@ -0,0 +1,171 @@ +version: "3.9" + +services: + # ── Reverse Proxy ────────────────────────────────────────── + nginx: + image: nginx:1.27-alpine + ports: + - "80:80" + - "443:443" + volumes: + - ./nginx/nginx.conf:/etc/nginx/nginx.conf:ro + - ./nginx/certs:/etc/nginx/certs:ro + depends_on: + orchestrator: + condition: service_healthy + networks: [frontend] + restart: always + + # ── Orchestrator Gateway ─────────────────────────────────── + orchestrator: + build: + context: ./agents/orchestrator + dockerfile: Dockerfile + environment: + - REDIS_URL=redis://:${REDIS_PASSWORD}@redis:6379/0 + - DATABASE_URL=postgresql://${PG_USER}:${PG_PASS}@postgres:5432/clawdbot + - ANTHROPIC_API_KEY_FILE=/run/secrets/anthropic_key + - MOONSHOT_API_KEY_FILE=/run/secrets/moonshot_key + secrets: [anthropic_key, moonshot_key] + networks: [frontend, backend] + depends_on: + postgres: { condition: service_healthy } + redis: { condition: service_healthy } + healthcheck: + test: ["CMD", "curl", "-f", "http://localhost:8000/health"] + interval: 30s + timeout: 10s + retries: 3 + restart: always + + # ── Personal Assistant Bot ───────────────────────────────── + personal-assistant: + build: + context: ./agents/personal-assistant + dockerfile: Dockerfile + environment: + - REDIS_URL=redis://:${REDIS_PASSWORD}@redis:6379/1 + - DATABASE_URL=postgresql://${PG_USER}:${PG_PASS}@postgres:5432/clawdbot + - ANTHROPIC_API_KEY_FILE=/run/secrets/anthropic_key + - GOOGLE_CREDENTIALS_FILE=/run/secrets/google_creds + - BOT_MODEL=claude-opus-4-6 + secrets: [anthropic_key, google_creds] + networks: [frontend, backend] + depends_on: + postgres: { condition: service_healthy } + redis: { condition: service_healthy } + restart: always + deploy: + resources: + limits: { cpus: "1.0", memory: 1G } + + # ── Business Advisor Bot ─────────────────────────────────── + business-advisor: + build: + context: ./agents/business-advisor + dockerfile: Dockerfile + environment: + - REDIS_URL=redis://:${REDIS_PASSWORD}@redis:6379/2 + - DATABASE_URL=postgresql://${PG_USER}:${PG_PASS}@postgres:5432/clawdbot + - MOONSHOT_API_KEY_FILE=/run/secrets/moonshot_key + - ANTHROPIC_API_KEY_FILE=/run/secrets/anthropic_key + - PRIMARY_MODEL=kimi-k2.5 + - FALLBACK_MODEL=claude-opus-4-6 + secrets: [moonshot_key, anthropic_key] + networks: [frontend, backend] + depends_on: + postgres: { condition: service_healthy } + redis: { condition: service_healthy } + restart: always + deploy: + resources: + limits: { cpus: "1.5", memory: 2G } + + # ── Code Generator Bot ──────────────────────────────────── + code-generator: + build: + context: ./agents/code-generator + dockerfile: Dockerfile + environment: + - REDIS_URL=redis://:${REDIS_PASSWORD}@redis:6379/3 + - DATABASE_URL=postgresql://${PG_USER}:${PG_PASS}@postgres:5432/clawdbot + - ANTHROPIC_API_KEY_FILE=/run/secrets/anthropic_key + - BOT_MODEL=claude-opus-4-6 + secrets: [anthropic_key] + networks: [frontend, backend, sandbox] + depends_on: + postgres: { condition: service_healthy } + redis: { condition: service_healthy } + restart: always + deploy: + resources: + limits: { cpus: "2.0", memory: 2G } + + # ── Code Execution Sandbox ───────────────────────────────── + sandbox-runner: + build: + context: ./agents/sandbox + dockerfile: Dockerfile + networks: [sandbox] + read_only: true + tmpfs: + - /tmp:rw,noexec,nosuid,size=200m + - /workspace:rw,size=100m + cap_drop: [ALL] + security_opt: ["no-new-privileges"] + deploy: + resources: + limits: { cpus: "1.0", memory: 512M } + reservations: { memory: 256M } + + # ── Infrastructure ───────────────────────────────────────── + postgres: + image: pgvector/pgvector:pg17 + environment: + POSTGRES_USER: ${PG_USER} + POSTGRES_PASSWORD_FILE: /run/secrets/pg_password + POSTGRES_DB: clawdbot + volumes: [pgdata:/var/lib/postgresql/data] + networks: [backend] + secrets: [pg_password] + healthcheck: + test: ["CMD-SHELL", "pg_isready -U ${PG_USER}"] + interval: 10s + retries: 5 + restart: always + + redis: + image: redis:7-alpine + command: > + redis-server + --appendonly yes + --requirepass ${REDIS_PASSWORD} + --maxmemory 256mb + --maxmemory-policy allkeys-lru + volumes: [redis_data:/data] + networks: [backend] + healthcheck: + test: ["CMD", "redis-cli", "-a", "${REDIS_PASSWORD}", "ping"] + interval: 10s + retries: 5 + restart: always + +secrets: + anthropic_key: + file: ./secrets/anthropic_key.txt + moonshot_key: + file: ./secrets/moonshot_key.txt + google_creds: + file: ./secrets/google_credentials.json + pg_password: + file: ./secrets/pg_password.txt + +volumes: + pgdata: + redis_data: + +networks: + frontend: + backend: + sandbox: + internal: true # No external access for code execution diff --git a/clawdbot/nginx/certs/.gitkeep b/clawdbot/nginx/certs/.gitkeep new file mode 100644 index 0000000..e69de29 diff --git a/clawdbot/nginx/nginx.conf b/clawdbot/nginx/nginx.conf new file mode 100644 index 0000000..b53e5b5 --- /dev/null +++ b/clawdbot/nginx/nginx.conf @@ -0,0 +1,40 @@ +events { worker_connections 1024; } + +http { + upstream orchestrator { server orchestrator:8000; } + + limit_req_zone $binary_remote_addr zone=api:10m rate=20r/s; + + server { + listen 80; + server_name clawdbot.example.com; + return 301 https://$host$request_uri; + } + + server { + listen 443 ssl; + server_name clawdbot.example.com; + + ssl_certificate /etc/nginx/certs/fullchain.pem; + ssl_certificate_key /etc/nginx/certs/privkey.pem; + ssl_protocols TLSv1.3; + + add_header X-Frame-Options DENY; + add_header X-Content-Type-Options nosniff; + add_header Strict-Transport-Security "max-age=31536000" always; + + location /api/ { + limit_req zone=api burst=40 nodelay; + proxy_pass http://orchestrator/; + proxy_set_header Host $host; + proxy_set_header X-Real-IP $remote_addr; + proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for; + proxy_set_header X-Forwarded-Proto $scheme; + proxy_read_timeout 120s; + } + + location /health { + proxy_pass http://orchestrator/health; + } + } +} diff --git a/clawdbot/scripts/deploy.sh b/clawdbot/scripts/deploy.sh new file mode 100755 index 0000000..060ec54 --- /dev/null +++ b/clawdbot/scripts/deploy.sh @@ -0,0 +1,42 @@ +#!/bin/bash +# deploy.sh — Deploy ClawdBot to Digital Ocean Droplet +set -euo pipefail + +DROPLET_IP="${1:?Usage: ./deploy.sh }" +DEPLOY_USER="${2:-deployer}" +APP_DIR="/opt/clawdbot" + +echo "Deploying ClawdBot to ${DROPLET_IP}..." + +ssh "${DEPLOY_USER}@${DROPLET_IP}" << 'REMOTE_SCRIPT' +set -euo pipefail +cd /opt/clawdbot + +# Pull latest code +git fetch origin main +git reset --hard origin/main + +# Create secrets directory if needed +mkdir -p secrets + +# Build and deploy with zero-downtime +docker compose build --parallel +docker compose up -d --remove-orphans + +# Wait for health checks +echo "Waiting for services to be healthy..." +sleep 10 +for service in orchestrator personal-assistant business-advisor code-generator; do + until docker compose exec -T "$service" curl -sf http://localhost:8000/health > /dev/null 2>&1; do + echo " Waiting for $service..." + sleep 5 + done + echo " $service is healthy" +done + +# Clean up old images +docker image prune -f + +echo "ClawdBot deployed successfully!" +docker compose ps +REMOTE_SCRIPT diff --git a/clawdbot/scripts/setup-droplet.sh b/clawdbot/scripts/setup-droplet.sh new file mode 100755 index 0000000..126917e --- /dev/null +++ b/clawdbot/scripts/setup-droplet.sh @@ -0,0 +1,35 @@ +#!/bin/bash +# setup-droplet.sh — One-time Digital Ocean Droplet setup +# +# Use a Docker 1-Click Droplet (Ubuntu 24.04 + Docker pre-installed) +# Recommended: $24/mo (4GB RAM, 2 vCPUs) minimum +# Production: $48/mo (8GB RAM, 4 vCPUs) recommended +set -euo pipefail + +# Security hardening +ufw default deny incoming +ufw default allow outgoing +ufw allow ssh +ufw allow 80/tcp +ufw allow 443/tcp +ufw --force enable + +# Install fail2ban and certbot +apt-get update && apt-get install -y fail2ban certbot +systemctl enable fail2ban + +# Create deploy user +useradd -m -s /bin/bash -G docker deployer +mkdir -p /home/deployer/.ssh +cp ~/.ssh/authorized_keys /home/deployer/.ssh/ +chown -R deployer:deployer /home/deployer/.ssh + +# Create app directory +mkdir -p /opt/clawdbot/secrets +chown -R deployer:deployer /opt/clawdbot + +echo "Droplet ready. Next steps:" +echo " 1. Clone the repo: su - deployer -c 'git clone /opt/clawdbot'" +echo " 2. Upload secrets to /opt/clawdbot/secrets/" +echo " 3. Copy .env.example to .env and fill in values" +echo " 4. Run: ./scripts/deploy.sh $(hostname -I | awk '{print $1}')" diff --git a/clawdbot/shared/__init__.py b/clawdbot/shared/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/clawdbot/shared/messaging.py b/clawdbot/shared/messaging.py new file mode 100644 index 0000000..0027665 --- /dev/null +++ b/clawdbot/shared/messaging.py @@ -0,0 +1,83 @@ +"""Cross-agent communication via Redis Streams. + +Provides a unified interface for inter-agent task dispatch, +event broadcasting, and status tracking. Used by all agents. +""" + +from __future__ import annotations + +import json +from datetime import datetime, timezone + +import redis.asyncio as redis + + +class AgentMessenger: + """Cross-agent communication via Redis Streams.""" + + def __init__(self, redis_url: str, agent_name: str): + self.redis = redis.from_url(redis_url, decode_responses=True) + self.agent_name = agent_name + self.stream = f"agent:tasks:{agent_name}" + self.events = "agent:events" + + async def publish_task(self, target_agent: str, task: dict) -> str: + """Send a task to another agent. Returns the message ID.""" + msg_id = await self.redis.xadd( + f"agent:tasks:{target_agent}", + { + "from": self.agent_name, + "task": json.dumps(task), + "timestamp": datetime.now(timezone.utc).isoformat(), + }, + ) + return msg_id + + async def consume_tasks(self, last_id: str = "$"): + """Listen for incoming tasks (async generator). + + Yields (message_id, task_dict) tuples. + """ + while True: + results = await self.redis.xread( + {self.stream: last_id}, block=5000, count=10 + ) + for _stream_name, messages in results: + for msg_id, data in messages: + yield msg_id, json.loads(data["task"]) + last_id = msg_id + + async def broadcast_event(self, event_type: str, data: dict) -> str: + """Broadcast an event visible to all agents.""" + msg_id = await self.redis.xadd( + self.events, + { + "type": event_type, + "agent": self.agent_name, + "data": json.dumps(data), + "timestamp": datetime.now(timezone.utc).isoformat(), + }, + ) + return msg_id + + async def get_agent_status(self, agent_name: str) -> dict | None: + """Check another agent's last known status.""" + status = await self.redis.hgetall(f"agent:status:{agent_name}") + return status or None + + async def update_status( + self, status: str, details: dict | None = None + ) -> None: + """Update this agent's status hash.""" + await self.redis.hset( + f"agent:status:{self.agent_name}", + mapping={ + "status": status, + "details": json.dumps(details or {}), + "updated": datetime.now(timezone.utc).isoformat(), + }, + ) + + async def close(self) -> None: + """Cleanly close the Redis connection.""" + await self.redis.aclose()