diff --git a/docs/E2E_TEST_SPEC.md b/docs/E2E_TEST_SPEC.md new file mode 100644 index 0000000..4a652cf --- /dev/null +++ b/docs/E2E_TEST_SPEC.md @@ -0,0 +1,831 @@ +# E2E Test Specification + +End-to-end testing strategy for Intuno communication networks using real agent implementations. + +--- + +## Architecture Decision + +**E2E tests live in the agents repo, not here.** + +- E2E tests *are* agent code that consumes Intuno — they belong on the consumer side +- The agents repo already has real agent implementations +- The agents repo gets its own CI pipeline testing against staging Intuno 24/7 +- Intuno repo keeps unit tests (63) and integration tests (`test_networks.py`) — no dependency on external agent processes +- Agent implementations can evolve without touching Intuno's CI + +**This document is the contract.** It specifies what agent types and test scenarios the agents repo should implement. + +--- + +## Reference Agents + +Five agent types are needed to cover all communication patterns. + +### Agent 1: Echo Agent + +The simplest possible agent. Receives any message, returns it with `[ECHO] ` prefix. + +**Capabilities:** +- Webhook-based (`callback_url`) +- Handles all three channels (call, message, mailbox via polling) +- For calls: returns `{"echo": "[ECHO] "}` +- For messages: POSTs to `reply_url` with `[ECHO] ` + +**What it tests:** +- Basic webhook delivery (Intuno → agent) +- `reply_url` mechanics (agent → Intuno) +- Call response semantics (synchronous blocking) +- Delivery payload structure validation +- Context presence in deliveries +- Signed URL acceptance + +**Implementation sketch:** + +```python +from fastapi import FastAPI, Request +import httpx + +app = FastAPI() + +@app.post("/webhook") +async def echo(request: Request): + payload = await request.json() + echo_content = f"[ECHO] {payload['content']}" + + if payload["channel"] == "call": + return {"echo": echo_content} + + # For messages: reply via signed callback URL + async with httpx.AsyncClient() as client: + await client.post(payload["reply_url"], json={ + "content": echo_content, + "recipient_participant_id": payload["sender"]["participant_id"], + "channel_type": "message", + }) + return {"status": "ok"} +``` + +--- + +### Agent 2: Conversational Agent + +Maintains conversation state using the `context[]` from delivery payloads. Responds contextually, referencing previous messages. + +**Capabilities:** +- Webhook-based +- Uses the `context` array to build conversation history +- References previous speakers and topics in responses +- Tracks how many messages it has exchanged (via context length) + +**What it tests:** +- Context window accuracy (are all messages present?) +- Multi-turn conversation coherence +- Message ordering in context +- Context growth over multiple exchanges +- `message_id` presence in context entries + +**Implementation sketch:** + +```python +@app.post("/webhook") +async def conversational(request: Request): + payload = await request.json() + + # Build a response that proves we have context + context = payload.get("context", []) + context_summary = f"I see {len(context)} messages in history." + if context: + first = context[0] + context_summary += f" First message was from {first['sender']}." + + response = f"Message #{len(context) + 1}: {context_summary} Responding to: {payload['content'][:50]}" + + if payload["channel"] == "call": + return {"response": response, "context_size": len(context)} + + async with httpx.AsyncClient() as client: + await client.post(payload["reply_url"], json={ + "content": response, + "recipient_participant_id": payload["sender"]["participant_id"], + "channel_type": "message", + }) + return {"status": "ok"} +``` + +--- + +### Agent 3: Proactive Agent + +On receiving any message, proactively messages ALL other participants visible in `network_participants[]`. + +**Capabilities:** +- Webhook-based +- Reads `network_participants` to discover all peers +- Sends individual messages to every other participant +- Can be used as a star hub or broadcast coordinator + +**What it tests:** +- Proactive communication (agent-initiated, not just responses) +- Multi-directional message flow (fan-out to multiple recipients) +- Topology enforcement (should fail in star if not hub, should fail in ring if targets wrong peer) +- `network_participants` field accuracy + +**Implementation sketch:** + +```python +@app.post("/webhook") +async def proactive(request: Request): + payload = await request.json() + my_name = "Proactive Agent" # Must match participant name + + if payload["channel"] == "call": + return {"response": "acknowledged"} + + participants = payload.get("network_participants", []) + my_id = next( + (p["participant_id"] for p in participants if p["name"] == my_name), + None, + ) + + async with httpx.AsyncClient() as client: + for p in participants: + if p["participant_id"] == my_id: + continue + await client.post(payload["reply_url"], json={ + "content": f"Proactive message to {p['name']}: I received '{payload['content'][:30]}...'", + "recipient_participant_id": p["participant_id"], + "channel_type": "message", + }) + + return {"status": "ok"} +``` + +--- + +### Agent 4: Polling Agent + +No webhook endpoint. Uses `polling_enabled: true` and checks its inbox on a timer. + +**Capabilities:** +- No `callback_url` — polling only +- Background loop: poll inbox → process → ack → reply via API +- Can handle mailbox channel messages + +**What it tests:** +- Inbox polling flow (`GET /inbox/{pid}`) +- Message acknowledgment (`POST /messages/ack`) +- Mailbox channel (no push delivery) +- Reply via API (not via `reply_url`) +- Unread-only filtering (acknowledged messages don't reappear) + +**Implementation sketch:** + +```python +import asyncio +import httpx + +BASE = "https://api.intuno.net" + +async def polling_loop(network_id: str, participant_id: str, token: str): + headers = {"Authorization": f"Bearer {token}"} + + async with httpx.AsyncClient() as client: + while True: + resp = await client.get( + f"{BASE}/networks/{network_id}/inbox/{participant_id}", + headers=headers, + params={"limit": 50}, + ) + messages = resp.json() + + if messages: + msg_ids = [] + for msg in messages: + # Reply via API + await client.post( + f"{BASE}/networks/{network_id}/messages/send", + headers=headers, + json={ + "sender_participant_id": participant_id, + "recipient_participant_id": msg["sender_participant_id"], + "content": f"[POLL] Received: {msg['content'][:80]}", + }, + ) + msg_ids.append(msg["id"]) + + # Acknowledge all processed messages + await client.post( + f"{BASE}/networks/{network_id}/messages/ack", + headers=headers, + json={"message_ids": msg_ids}, + ) + + await asyncio.sleep(3) +``` + +--- + +### Agent 5: Multi-Channel Agent + +Handles all three channels, responding differently based on channel type. + +**Capabilities:** +- Webhook-based + polling +- Different behavior per channel: + - `call` → returns structured JSON immediately + - `message` → replies via `reply_url` with acknowledgment + - `mailbox` → processes via polling, replies via API + +**What it tests:** +- Channel-specific routing within a single agent +- Mixed channel workflows in the same network +- Call blocking semantics (30s timeout) +- Message/mailbox distinction + +**Implementation sketch:** + +```python +@app.post("/webhook") +async def multi_channel(request: Request): + payload = await request.json() + channel = payload["channel"] + + if channel == "call": + return { + "channel_received": "call", + "response": f"Sync response to: {payload['content'][:50]}", + "processed_at": datetime.utcnow().isoformat(), + } + + if channel == "message": + async with httpx.AsyncClient() as client: + await client.post(payload["reply_url"], json={ + "content": f"[MSG ACK] Received via message channel: {payload['content'][:50]}", + "recipient_participant_id": payload["sender"]["participant_id"], + "channel_type": "message", + }) + + # Mailbox messages: no immediate response (processed via polling) + return {"status": "ok", "channel_received": channel} +``` + +--- + +## E2E Test Scenarios + +### Scenario 1: Basic Echo Round-Trip + +**Agents:** Echo + API caller (test script) +**Network:** mesh + +``` +Test script → POST /messages/send (to Echo) + → Intuno delivers to Echo's webhook + → Echo POSTs to reply_url with [ECHO] prefix + → Test script polls /inbox and finds Echo's reply +``` + +**Assertions:** +- Original message appears in `/messages` list +- Echo's reply appears in caller's inbox +- Reply content starts with `[ECHO] ` +- Both messages appear in `/context` with correct senders +- Context entry includes `message_id` + +--- + +### Scenario 2: Multi-Turn Conversation + +**Agents:** Conversational x 2 +**Network:** mesh + +``` +Agent A receives trigger via API + → A messages B (via /messages/send) + → B receives with context, replies referencing context + → A receives B's reply with updated context + → Repeat for 5 turns +``` + +**Assertions:** +- Context grows with each exchange (1, 2, 3, 4, 5... messages) +- Each agent sees the full conversation in `context[]` +- Messages are ordered chronologically +- `context_size` in responses matches expected count +- All `message_id` fields are present and unique + +--- + +### Scenario 3: Proactive Initiation + +**Agents:** Proactive + Echo +**Network:** mesh + +``` +Test script sends one message to Proactive + → Proactive receives and messages Echo (proactively) + → Echo replies to Proactive + → Test script verifies 3 messages in context (trigger, proactive, echo reply) +``` + +**Assertions:** +- Proactive agent successfully sends without being the original recipient +- Echo receives the proactive message +- Context shows messages from 3 different senders (test, proactive, echo) +- `in_reply_to_id` is null on the proactive message (new conversation) + +--- + +### Scenario 4: Star Topology Hub-Spoke + +**Agents:** Proactive (as hub) + Echo x 2 (as spokes) +**Network:** star + +``` +Hub sends to Spoke A → succeeds +Hub sends to Spoke B → succeeds +Spoke A tries to send to Spoke B → 400 (topology violation) +Spoke A sends to Hub → succeeds (reply to hub) +``` + +**Assertions:** +- Hub (first participant) can message any spoke +- Spoke-to-spoke communication is blocked with 400 +- Spokes can reply to hub via `reply_url` +- Error message mentions "hub" + +--- + +### Scenario 5: Ring Topology Chain + +**Agents:** Echo x 3 (A, B, C) +**Network:** ring + +``` +A → B → C → A (each forwards to next) +A ✗→ C (skip B, should fail with 400) +``` + +**Assertions:** +- A can send to B (next in order) +- B can send to C (next in order) +- C can send to A (wraps around) +- A cannot send to C (skipping B) — returns 400 +- Error message mentions "Ring topology" + +--- + +### Scenario 6: Polling Workflow + +**Agents:** Polling + Echo +**Network:** mesh + +``` +Test script sends message via /mailbox to Polling Agent + → Polling Agent polls /inbox and finds it + → Polling Agent acks the message + → Polling Agent replies via /messages/send + → Verify acked messages don't reappear in inbox +``` + +**Assertions:** +- Mailbox message appears in inbox +- After ack, message does not appear in next inbox poll +- Polling agent can reply via API (not reply_url) +- Both messages appear in context + +--- + +### Scenario 7: Mixed Channels + +**Agents:** Multi-Channel x 2 +**Network:** mesh + +``` +Agent A calls Agent B (synchronous) → gets immediate response +Agent A messages Agent B (async) → B replies via reply_url +Agent A sends to B's mailbox → B polls and processes +``` + +**Assertions:** +- Call returns synchronous response within timeout +- Message delivery triggers webhook +- Mailbox message only appears via inbox polling +- All three channel types recorded in context with correct `channel` field +- Context entries distinguish between "call", "message", "mailbox" + +--- + +### Scenario 8: Delivery Retry + +**Agents:** Echo (starts offline, comes online after 5s) +**Network:** mesh + +``` +Send message to Echo while it's offline + → Delivery fails → enqueued for retry + → Echo comes online after 5s + → Delivery worker retries → succeeds +``` + +**Assertions:** +- Message status is `pending` initially +- After retry succeeds, message status becomes `delivered` +- Echo receives the message (eventually) +- No duplicate deliveries + +**Note:** This test requires the ability to start/stop the Echo agent process. + +--- + +### Scenario 9: 5-Agent Mesh Conversation + +**Agents:** Conversational x 5 +**Network:** mesh + +``` +A → B, B → C, C → D, D → E, E → A + → Verify all 5 messages in context + → Send 5 more (A → C, B → D, C → E, D → A, E → B) + → Verify context has 10 messages, all senders present +``` + +**Assertions:** +- Context grows correctly with many participants +- All 5 participant names appear in context +- `network_participants` in deliveries lists all 5 +- No message loss at scale +- Context window limits respected (max 30 in deliveries) + +--- + +### Scenario 10: Cross-Network Isolation + +**Agents:** Echo x 2 in Network A, Echo x 2 in Network B +**Networks:** mesh x 2 + +``` +Send messages in Network A +Send messages in Network B + → Verify Network A context has only Network A messages + → Verify Network B context has only Network B messages +``` + +**Assertions:** +- Messages don't leak between networks +- Context is network-scoped +- Same agent can participate in multiple networks independently + +--- + +### Scenario 11: Participant Lifecycle + +**Agents:** Echo x 3 (A, B, C) +**Network:** mesh + +``` +A, B, C join and exchange messages + → Remove B (DELETE /participants/{B_id}) + → A sends to C → succeeds + → A sends to B → fails (participant removed) + → Verify B's old messages still in context + → Add D as replacement → communication resumes +``` + +**Assertions:** +- Removed participant cannot receive new messages +- Sending to removed participant returns 400 +- Historical messages from removed participant persist in context +- New participant can see old context after joining + +--- + +### Scenario 12: A2A Interop + +**Agents:** A2A-compatible agent (serves `/.well-known/agent.json`) +**Network:** mesh + +``` +Import A2A agent: POST /a2a/agents/import {"url": "..."} + → Verify agent appears in registry + → Add imported agent to a network + → Send message via A2A tasks/send endpoint + → Verify response in A2A JSON-RPC format +``` + +**Assertions:** +- Agent Card is fetched successfully +- Agent is indexed in registry with `a2a` tag +- A2A task send produces valid JSON-RPC response +- Agent can participate in normal network communication + +--- + +## 24/7 Monitoring Strategy + +### Architecture + +``` +Agents Repo +├── agents/ +│ ├── echo_agent.py +│ ├── conversational_agent.py +│ ├── proactive_agent.py +│ ├── polling_agent.py +│ └── multi_channel_agent.py +├── tests/ +│ ├── e2e/ +│ │ ├── conftest.py # Fixtures: auth, network setup, cleanup +│ │ ├── test_echo_roundtrip.py # Scenario 1 +│ │ ├── test_multi_turn.py # Scenario 2 +│ │ ├── test_proactive.py # Scenario 3 +│ │ ├── test_topology_star.py # Scenario 4 +│ │ ├── test_topology_ring.py # Scenario 5 +│ │ ├── test_polling.py # Scenario 6 +│ │ ├── test_mixed_channels.py # Scenario 7 +│ │ ├── test_delivery_retry.py # Scenario 8 +│ │ ├── test_scale_mesh.py # Scenario 9 +│ │ ├── test_isolation.py # Scenario 10 +│ │ ├── test_lifecycle.py # Scenario 11 +│ │ └── test_a2a.py # Scenario 12 +│ └── health/ +│ └── smoke.py # Quick 30-second health check +├── docker-compose.yml # Brings up Intuno + all agents +└── .github/workflows/ + ├── e2e.yml # Full suite (hourly) + └── health.yml # Smoke test (every 5 minutes) +``` + +### Health Check (Every 5 Minutes) + +A quick smoke test that validates the core path: + +```python +async def health_check(): + """30-second smoke test. Create network → send message → verify → cleanup.""" + token = await login() + + # Create network + network = await create_network(token, "health-check") + + # Add 2 polling participants + alice = await join_network(token, network["id"], "Alice", polling=True) + bob = await join_network(token, network["id"], "Bob", polling=True) + + # Send message Alice → Bob + await send_message(token, network["id"], alice["id"], bob["id"], "health ping") + + # Verify Bob's inbox + inbox = await get_inbox(token, network["id"], bob["id"]) + assert len(inbox) >= 1 + assert "health ping" in inbox[0]["content"] + + # Verify context + context = await get_context(token, network["id"]) + assert len(context["entries"]) >= 1 + + # Cleanup + await delete_network(token, network["id"]) +``` + +### Full E2E Suite (Hourly) + +Run all 12 scenarios against the live platform: + +```yaml +# .github/workflows/e2e.yml +name: E2E Tests +on: + schedule: + - cron: '0 * * * *' # Every hour + workflow_dispatch: {} + +jobs: + e2e: + runs-on: ubuntu-latest + services: + # Agents run as sidecar containers + echo-agent: + image: your-registry/echo-agent:latest + ports: ['8001:8000'] + conversational-agent: + image: your-registry/conversational-agent:latest + ports: ['8002:8000'] + proactive-agent: + image: your-registry/proactive-agent:latest + ports: ['8003:8000'] + multi-channel-agent: + image: your-registry/multi-channel-agent:latest + ports: ['8004:8000'] + + steps: + - uses: actions/checkout@v4 + - uses: actions/setup-python@v5 + with: { python-version: '3.12' } + - run: pip install -e ".[test]" + - run: pytest tests/e2e/ -v --tb=short + env: + INTUNO_BASE_URL: ${{ secrets.STAGING_URL }} + INTUNO_EMAIL: ${{ secrets.E2E_EMAIL }} + INTUNO_PASSWORD: ${{ secrets.E2E_PASSWORD }} + ECHO_AGENT_URL: http://echo-agent:8000 + CONVERSATIONAL_AGENT_URL: http://conversational-agent:8000 + PROACTIVE_AGENT_URL: http://proactive-agent:8000 + MULTI_CHANNEL_AGENT_URL: http://multi-channel-agent:8000 +``` + +### Critical Path Tests (Every 15 Minutes) + +Run scenarios 1, 2, and 3 more frequently since they cover the core communication loop: + +```yaml +name: Critical Path +on: + schedule: + - cron: '*/15 * * * *' + +jobs: + critical: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v4 + - run: pip install -e ".[test]" + - run: pytest tests/e2e/test_echo_roundtrip.py tests/e2e/test_multi_turn.py tests/e2e/test_proactive.py -v +``` + +### Alerting + +- **On failure**: Retry once before alerting (avoid false positives from transient network issues) +- **Alert channels**: Slack webhook + email +- **Include**: Test name, failure message, timestamp, link to run +- **Track metrics**: Latency per scenario, delivery success rate, context accuracy, over time + +### Local Development + +Docker Compose for running the full stack locally: + +```yaml +# docker-compose.yml (in agents repo) +services: + # Intuno platform + intuno: + build: ../intuno + ports: ['8000:8000'] + depends_on: [postgres, redis, qdrant] + environment: + DATABASE_URL: postgresql+asyncpg://postgres:postgres@postgres:5432/intuno + REDIS_URL: redis://redis:6379/0 + QDRANT_URL: http://qdrant:6333 + + # Infrastructure + postgres: + image: pgvector/pgvector:pg16 + environment: { POSTGRES_DB: intuno, POSTGRES_PASSWORD: postgres } + redis: + image: redis:7-alpine + qdrant: + image: qdrant/qdrant:latest + + # Test agents + echo-agent: + build: ./agents/echo + ports: ['8001:8000'] + conversational-agent: + build: ./agents/conversational + ports: ['8002:8000'] + proactive-agent: + build: ./agents/proactive + ports: ['8003:8000'] + multi-channel-agent: + build: ./agents/multi_channel + ports: ['8004:8000'] +``` + +Run E2E tests locally: + +```bash +docker compose up -d +sleep 10 # Wait for services +pytest tests/e2e/ -v +docker compose down +``` + +--- + +## Test Fixtures (conftest.py) + +Common fixtures for the E2E test suite: + +```python +import os +import uuid + +import httpx +import pytest + +BASE_URL = os.getenv("INTUNO_BASE_URL", "http://localhost:8000") + + +@pytest.fixture +async def client(): + async with httpx.AsyncClient(timeout=30) as c: + yield c + + +@pytest.fixture +async def auth_token(client): + """Register a unique test user and return JWT token.""" + email = f"e2e-{uuid.uuid4().hex[:8]}@test.local" + await client.post(f"{BASE_URL}/auth/register", json={ + "email": email, + "password": "TestPass123!", + "first_name": "E2E", + "last_name": "Test", + }) + resp = await client.post(f"{BASE_URL}/auth/login", json={ + "email": email, + "password": "TestPass123!", + }) + return resp.json()["access_token"] + + +@pytest.fixture +async def network(client, auth_token): + """Create a test network and clean up after.""" + headers = {"Authorization": f"Bearer {auth_token}"} + resp = await client.post( + f"{BASE_URL}/networks", + headers=headers, + json={"name": f"e2e-test-{uuid.uuid4().hex[:8]}", "topology_type": "mesh"}, + ) + network_data = resp.json() + yield network_data + await client.delete(f"{BASE_URL}/networks/{network_data['id']}", headers=headers) + + +async def add_participant(client, auth_token, network_id, name, callback_url=None, polling=False): + """Helper to add a participant to a network.""" + headers = {"Authorization": f"Bearer {auth_token}"} + body = {"name": name, "polling_enabled": polling} + if callback_url: + body["callback_url"] = callback_url + resp = await client.post( + f"{BASE_URL}/networks/{network_id}/participants", + headers=headers, + json=body, + ) + assert resp.status_code == 201 + return resp.json() + + +async def send_message(client, auth_token, network_id, sender_id, recipient_id, content): + """Helper to send a message.""" + headers = {"Authorization": f"Bearer {auth_token}"} + resp = await client.post( + f"{BASE_URL}/networks/{network_id}/messages/send", + headers=headers, + json={ + "sender_participant_id": sender_id, + "recipient_participant_id": recipient_id, + "content": content, + }, + ) + assert resp.status_code == 201 + return resp.json() + + +async def get_inbox(client, auth_token, network_id, participant_id, limit=50): + """Helper to poll inbox.""" + headers = {"Authorization": f"Bearer {auth_token}"} + resp = await client.get( + f"{BASE_URL}/networks/{network_id}/inbox/{participant_id}", + headers=headers, + params={"limit": limit}, + ) + return resp.json() + + +async def get_context(client, auth_token, network_id, limit=50): + """Helper to get network context.""" + headers = {"Authorization": f"Bearer {auth_token}"} + resp = await client.get( + f"{BASE_URL}/networks/{network_id}/context", + headers=headers, + params={"limit": limit}, + ) + return resp.json() +``` + +--- + +## Cleanup Strategy + +Every E2E test must clean up after itself to prevent state pollution: + +1. Each test creates a **unique network** (random name) +2. Each test registers a **unique user** (random email) +3. After assertions: `DELETE /networks/{id}` removes all participants and messages +4. Use pytest fixtures with `yield` for automatic cleanup on failure + +**Never reuse networks or participants across tests.** Each test is fully isolated. diff --git a/docs/INTEGRATION_GUIDE.md b/docs/INTEGRATION_GUIDE.md new file mode 100644 index 0000000..ab12cd0 --- /dev/null +++ b/docs/INTEGRATION_GUIDE.md @@ -0,0 +1,711 @@ +# Building Agents on Intuno Networks + +How to build external agents that participate in Intuno's multi-directional communication networks. + +--- + +## Quick Start + +### 1. Authenticate + +```bash +# Register +curl -X POST https://api.intuno.net/auth/register \ + -H "Content-Type: application/json" \ + -d '{"email": "you@example.com", "password": "YourPass123!", "first_name": "Dev", "last_name": "Agent"}' + +# Login → get JWT token +TOKEN=$(curl -s -X POST https://api.intuno.net/auth/login \ + -H "Content-Type: application/json" \ + -d '{"email": "you@example.com", "password": "YourPass123!"}' | jq -r '.access_token') +``` + +### 2. Register Your Agent + +```bash +curl -X POST https://api.intuno.net/registry/agents \ + -H "Authorization: Bearer $TOKEN" \ + -H "Content-Type: application/json" \ + -d '{ + "name": "My Agent", + "description": "Analyzes data and responds with insights", + "endpoint": "https://my-agent.example.com", + "tags": ["analysis", "data"] + }' +``` + +Save the returned `id` and `agent_id`. + +### 3. Create a Network + +```bash +curl -X POST https://api.intuno.net/networks \ + -H "Authorization: Bearer $TOKEN" \ + -H "Content-Type: application/json" \ + -d '{"name": "My Analysis Network", "topology_type": "mesh"}' +``` + +### 4. Join as a Participant + +```bash +# Webhook-based agent (Intuno pushes messages to your endpoint) +curl -X POST https://api.intuno.net/networks/$NETWORK_ID/participants \ + -H "Authorization: Bearer $TOKEN" \ + -H "Content-Type: application/json" \ + -d '{ + "name": "My Agent", + "agent_id": "'$AGENT_DB_ID'", + "participant_type": "agent", + "callback_url": "https://my-agent.example.com/webhook" + }' +``` + +### 5. Send a Message + +```bash +curl -X POST https://api.intuno.net/networks/$NETWORK_ID/messages/send \ + -H "Authorization: Bearer $TOKEN" \ + -H "Content-Type: application/json" \ + -d '{ + "sender_participant_id": "'$MY_PARTICIPANT_ID'", + "recipient_participant_id": "'$OTHER_PARTICIPANT_ID'", + "content": "Can you analyze dataset X?" + }' +``` + +### Minimal Webhook Agent (Python) + +The simplest agent that can participate in Intuno networks: + +```python +from fastapi import FastAPI, Request + +app = FastAPI() + +@app.post("/webhook") +async def handle_intuno_message(request: Request): + payload = await request.json() + + content = payload["content"] + sender = payload["sender"]["name"] + reply_url = payload["reply_url"] + channel = payload["channel"] + + # For calls: return response directly (caller is waiting) + if channel == "call": + return {"response": f"Received from {sender}: {content}"} + + # For messages: POST back to reply_url asynchronously + import httpx + async with httpx.AsyncClient() as client: + await client.post(reply_url, json={ + "content": f"Got it, {sender}. Processing your request.", + "recipient_participant_id": payload["sender"]["participant_id"], + "channel_type": "message", + }) + + return {"status": "ok"} +``` + +--- + +## Communication Patterns + +### Pattern A: Webhook Agent (Push) + +Your agent receives messages via HTTP POST to its `callback_url`. This is the primary pattern for real-time agents. + +**Setup:** Register the participant with a `callback_url`. + +```json +{ + "name": "My Webhook Agent", + "callback_url": "https://my-agent.example.com/webhook", + "participant_type": "agent" +} +``` + +**Receiving messages:** Intuno POSTs the delivery payload to your `callback_url`: + +```json +{ + "network_id": "550e8400-e29b-41d4-a716-446655440000", + "message_id": "6ba7b810-9dad-11d1-80b4-00c04fd430c8", + "channel": "message", + "sender": { + "participant_id": "6ba7b811-9dad-11d1-80b4-00c04fd430c8", + "name": "Research Agent" + }, + "content": "I found 3 relevant papers on the topic.", + "context": [ + { + "sender": "User Persona", + "recipient": "Research Agent", + "channel": "message", + "content": "Find papers about multi-agent coordination", + "message_id": "prev-msg-uuid", + "timestamp": 1712000000.0 + }, + { + "sender": "Research Agent", + "recipient": "User Persona", + "channel": "message", + "content": "I found 3 relevant papers on the topic.", + "message_id": "curr-msg-uuid", + "timestamp": 1712000060.0 + } + ], + "reply_url": "https://api.intuno.net/networks/550e.../participants/6ba7.../callback?sig=a1b2c3...&exp=1712086400", + "network_participants": [ + {"participant_id": "6ba7b811-...", "name": "Research Agent"}, + {"participant_id": "6ba7b812-...", "name": "Analyst Agent"}, + {"participant_id": "6ba7b813-...", "name": "User Persona"} + ] +} +``` + +**Key fields:** + +| Field | Description | +|-------|-------------| +| `content` | The message text (max 65,536 characters) | +| `channel` | `"call"`, `"message"`, or `"mailbox"` | +| `context` | Last ~20-30 messages in the network (conversation history) | +| `reply_url` | HMAC-signed URL to POST replies back (valid 24 hours) | +| `network_participants` | All active participants (for visibility / routing decisions) | +| `sender.participant_id` | Who sent this message | +| `message_id` | Unique ID for this specific message | + +**Responding to calls:** For `channel: "call"`, the caller is blocking and waiting for your HTTP response body. Return JSON directly: + +```python +@app.post("/webhook") +async def handle(request: Request): + payload = await request.json() + if payload["channel"] == "call": + # Caller blocks for up to 30 seconds waiting for this + return {"analysis": "Dataset X has 3 anomalies", "confidence": 0.92} +``` + +**Responding to messages:** For `channel: "message"`, POST back to the `reply_url`: + +```python +@app.post("/webhook") +async def handle(request: Request): + payload = await request.json() + if payload["channel"] == "message": + # Process asynchronously, then reply + async with httpx.AsyncClient() as client: + await client.post(payload["reply_url"], json={ + "content": "Analysis complete. Found 3 anomalies.", + "recipient_participant_id": payload["sender"]["participant_id"], + "channel_type": "message", + }) + return {"status": "ok"} +``` + +### Pattern B: Polling Agent (Pull) + +Your agent does not expose an HTTP endpoint. Instead, it periodically polls Intuno for new messages. + +**Setup:** Register with `polling_enabled: true` and no `callback_url`. + +```json +{ + "name": "My Polling Agent", + "polling_enabled": true, + "participant_type": "agent" +} +``` + +**Polling loop:** + +```python +import asyncio +import httpx + +BASE = "https://api.intuno.net" +HEADERS = {"Authorization": f"Bearer {TOKEN}"} + +async def polling_loop(network_id: str, participant_id: str): + async with httpx.AsyncClient() as client: + while True: + # 1. Check inbox for unread messages + resp = await client.get( + f"{BASE}/networks/{network_id}/inbox/{participant_id}", + headers=HEADERS, + params={"limit": 50}, + ) + messages = resp.json() + + if messages: + msg_ids = [] + for msg in messages: + # 2. Process each message + await process_message(client, network_id, participant_id, msg) + msg_ids.append(msg["id"]) + + # 3. Acknowledge processed messages (marks as read) + await client.post( + f"{BASE}/networks/{network_id}/messages/ack", + headers=HEADERS, + json={"message_ids": msg_ids}, + ) + + await asyncio.sleep(5) # Poll every 5 seconds + +async def process_message(client, network_id, participant_id, msg): + """Process a single message and reply.""" + await client.post( + f"{BASE}/networks/{network_id}/messages/send", + headers=HEADERS, + json={ + "sender_participant_id": participant_id, + "recipient_participant_id": msg["sender_participant_id"], + "content": f"Received: {msg['content'][:100]}", + }, + ) +``` + +**Important:** The inbox only returns unread messages where your participant is the recipient. After processing, call `/messages/ack` to prevent re-delivery on the next poll. + +### Pattern C: Hybrid Agent + +Combine both patterns. Your agent receives push deliveries via webhook but can also poll for messages that may have been missed (e.g., during downtime). + +```json +{ + "name": "Resilient Agent", + "callback_url": "https://my-agent.example.com/webhook", + "polling_enabled": true, + "participant_type": "agent" +} +``` + +This is recommended for production agents that need reliability. + +--- + +## Channel Types + +### Call (Synchronous, Blocking) + +``` +POST /networks/{network_id}/call +``` + +- Caller blocks until the recipient responds (up to 30 seconds) +- Recipient **must** have a `callback_url` — calls cannot target polling-only agents +- Recipient returns their response as the HTTP response body +- Both the outgoing call and the response are recorded in the network context +- Use for: direct questions, tool invocations, real-time decisions + +```bash +curl -X POST https://api.intuno.net/networks/$NID/call \ + -H "Authorization: Bearer $TOKEN" \ + -H "Content-Type: application/json" \ + -d '{ + "sender_participant_id": "'$SENDER_ID'", + "recipient_participant_id": "'$RECIPIENT_ID'", + "content": "What is the sentiment of this text: Hello world" + }' +``` + +Response: +```json +{ + "success": true, + "message_id": "uuid", + "response": {"sentiment": "positive", "score": 0.95} +} +``` + +### Message (Async, Webhook Push) + +``` +POST /networks/{network_id}/messages/send +``` + +- Non-blocking — returns immediately after recording +- Intuno attempts webhook delivery to the recipient's `callback_url` +- If delivery fails, the message is enqueued for retry (up to 3 attempts with exponential backoff) +- Recipient replies via the signed `reply_url` in the delivery payload +- Use for: conversational exchanges, collaborative workflows, notifications + +Response (201): +```json +{ + "id": "uuid", + "network_id": "uuid", + "sender_participant_id": "uuid", + "recipient_participant_id": "uuid", + "channel_type": "message", + "content": "Your message text", + "status": "pending", + "created_at": "2025-01-15T10:30:00Z" +} +``` + +### Mailbox (Fully Async, Polling Only) + +``` +POST /networks/{network_id}/mailbox +``` + +- No push delivery attempted — message is just stored +- Recipient must poll `GET /networks/{id}/inbox/{pid}` to retrieve it +- Use for: batch processing, non-urgent coordination, offline agents +- Same request/response shape as message, but `channel_type: "mailbox"` + +--- + +## Proactive Communication + +Agents can **initiate** conversations, not just respond. This is the key feature that separates networks from traditional request-response patterns. + +### How It Works + +Every delivery payload includes a signed `reply_url`. Your agent can POST to this URL at any time (within 24 hours) to send messages to **any** participant in the network. + +```python +# Agent received a delivery with a reply_url +reply_url = payload["reply_url"] +participants = payload["network_participants"] + +# Proactively message another participant (not the sender) +target = next(p for p in participants if p["name"] == "Manager Agent") + +async with httpx.AsyncClient() as client: + await client.post(reply_url, json={ + "content": "Alert: anomaly detected in dataset X. Confidence: 0.95", + "recipient_participant_id": target["participant_id"], + "channel_type": "message", + }) +``` + +### Callback Payload + +When POSTing to a `reply_url`, send: + +```json +{ + "content": "Your message text (required, max 65536 chars)", + "recipient_participant_id": "uuid (optional — target a specific participant)", + "channel_type": "message", + "metadata": {"key": "value"}, + "in_reply_to_id": "uuid (optional — link to a previous message)" +} +``` + +| Field | Required | Description | +|-------|----------|-------------| +| `content` | Yes | Message text | +| `recipient_participant_id` | No | Target participant. If set, message is forwarded to them. | +| `channel_type` | No | `"call"`, `"message"`, or `"mailbox"`. Default: `"message"` | +| `metadata` | No | Arbitrary key-value pairs | +| `in_reply_to_id` | No | UUID of a previous message this replies to | + +### Proactive Agent Example + +A monitoring agent that watches for anomalies and alerts all other participants: + +```python +@app.post("/webhook") +async def handle(request: Request): + payload = await request.json() + + # Store the reply_url for later proactive use + store_reply_url(payload["network_id"], payload["reply_url"]) + + # Check for anomalies (your logic here) + anomalies = detect_anomalies(payload["content"]) + + if anomalies: + # Alert every other participant in the network + my_id = None # Determine from participant list + for p in payload["network_participants"]: + if p["name"] == "Monitoring Agent": + my_id = p["participant_id"] + continue + + async with httpx.AsyncClient() as client: + await client.post(payload["reply_url"], json={ + "content": f"ALERT: {len(anomalies)} anomalies detected", + "recipient_participant_id": p["participant_id"], + "channel_type": "message", + }) + + return {"status": "ok"} +``` + +--- + +## Callback Authentication + +Reply URLs are HMAC-SHA256 signed. You do **not** need to compute signatures yourself. + +### How It Works + +1. When Intuno delivers a message, the `reply_url` already contains the signature: + ``` + https://api.intuno.net/networks/{id}/participants/{pid}/callback?sig=a1b2c3...&exp=1712086400 + ``` + +2. Your agent just POSTs to the URL as-is. Intuno validates the `sig` and `exp` parameters server-side. + +3. Signatures expire after **24 hours** by default. If you try to use an expired `reply_url`, you'll get a `403 Forbidden`. + +### What If My reply_url Expires? + +If your agent needs to send messages after the 24-hour window: + +1. **Poll the inbox** — `GET /networks/{id}/inbox/{pid}` returns messages with fresh context +2. **Send via the API** — Use `POST /networks/{id}/messages/send` with your JWT token (no reply_url needed) +3. **Wait for a new delivery** — The next incoming message will include a fresh `reply_url` + +### Security Notes + +- Signatures are computed as `HMAC-SHA256(network_id:participant_id:expiry, secret)` +- The secret is the platform's `JWT_SECRET_KEY` — your agent never needs it +- Expired or tampered signatures are rejected with `403 Forbidden` +- Each `reply_url` is scoped to a specific participant in a specific network + +--- + +## Context Window + +Every delivery includes a `context` array — the last ~20-30 messages exchanged in the network. This is your agent's conversation history. + +```json +{ + "context": [ + { + "sender": "Alice", + "recipient": "Bob", + "channel": "message", + "content": "Can you review the proposal?", + "message_id": "uuid-1", + "timestamp": 1712000000.0 + }, + { + "sender": "Bob", + "recipient": "Alice", + "channel": "message", + "content": "Sure, sending my feedback now.", + "message_id": "uuid-2", + "timestamp": 1712000060.0 + } + ] +} +``` + +### Key Points + +- Context is **per-network**, not per-participant — your agent sees all messages in the network, even those between other participants +- Context includes up to 30 recent messages by default +- Context is stored in Redis Streams with a 7-day TTL +- The `message_id` field lets you correlate context entries with specific database records +- You can also fetch context explicitly: `GET /networks/{id}/context?limit=50` + +### Using Context in Your Agent + +```python +@app.post("/webhook") +async def handle(request: Request): + payload = await request.json() + + # Build conversation history for your LLM + history = [] + for entry in payload["context"]: + history.append({ + "role": "user" if entry["sender"] != "My Agent" else "assistant", + "content": f"[{entry['sender']}]: {entry['content']}" + }) + + # Add the current message + history.append({ + "role": "user", + "content": f"[{payload['sender']['name']}]: {payload['content']}" + }) + + # Pass to your LLM with full context + response = await call_llm(history) + return {"response": response} +``` + +--- + +## Topology Constraints + +Networks enforce communication rules based on their topology type. + +### mesh (default) + +Any participant can communicate with any other. No restrictions. + +### star + +Only the **hub** (first participant to join) can initiate communication. Spokes can only reply to the hub. + +``` +Hub ←→ Spoke A +Hub ←→ Spoke B +Hub ←→ Spoke C +Spoke A ✗→ Spoke B (blocked) +``` + +Use for: centralized coordination, orchestrator patterns, approval workflows. + +### ring + +Messages flow sequentially. Each participant can only message the **next** participant in join order. Wraps around at the end. + +``` +A → B → C → A (circular) +A ✗→ C (skipping B is blocked) +``` + +Use for: pipeline processing, sequential review chains, assembly lines. + +### custom + +No enforcement. The application manages routing externally. + +### Topology Validation Errors + +If a participant tries to communicate in violation of the topology: + +```json +{ + "detail": "Star topology: only the hub can initiate communication" +} +``` + +Status: `400 Bad Request` + +--- + +## Network Lifecycle + +### States + +**Network states:** `active`, `paused`, `closed` +- Only `active` networks accept new messages +- Paused networks preserve data but reject communication + +**Participant states:** `active`, `disconnected`, `removed` +- Only `active` participants can send/receive +- `removed` participants are gone permanently + +### Full Lifecycle + +``` +1. POST /networks → Create network (active) +2. POST /networks/{id}/participants → Add participants +3. ...communicate via channels... +4. PATCH /networks/{id}/participants/{pid} → Update callback_url, capabilities +5. DELETE /networks/{id}/participants/{pid} → Remove a participant +6. PATCH /networks/{id} → Pause/update network +7. DELETE /networks/{id} → Delete network (cascades) +``` + +Messages are preserved even after a participant is removed. The participant simply cannot send or receive new messages. + +--- + +## Error Handling + +| Status | Cause | Example | +|--------|-------|---------| +| `400` | Validation error | Bad topology type, missing required fields, content > 65,536 chars, topology violation, inactive network/participant | +| `403` | Authorization failure | Expired/invalid callback signature, user doesn't own the network | +| `404` | Not found | Network, participant, or message doesn't exist | +| `409` | Conflict | Duplicate agent in same network | + +### Common Pitfalls + +1. **Expired reply_url**: Signatures are valid for 24 hours. If you get 403 on a callback, your URL has expired. Use the API directly or wait for a fresh delivery. + +2. **Wrong network owner**: All channel operations verify the JWT user owns the network. You can't send messages in someone else's network via the API. + +3. **Topology violations**: In `star` networks, only the hub can initiate. In `ring`, you can only message the next participant. Violations return 400. + +4. **Callback URL requirements**: In production, callback URLs must be HTTPS and cannot point to private IP ranges (10.x, 172.16.x, 192.168.x, 127.x, 169.254.x). This prevents SSRF attacks. + +5. **Content size**: Messages are capped at 65,536 characters. Larger payloads are rejected with 400. + +--- + +## API Reference + +### Network Management + +| Method | Path | Auth | Description | +|--------|------|------|-------------| +| `POST` | `/networks` | JWT | Create a network | +| `GET` | `/networks` | JWT | List your networks | +| `GET` | `/networks/{id}` | JWT | Get network details | +| `PATCH` | `/networks/{id}` | JWT | Update network | +| `DELETE` | `/networks/{id}` | JWT | Delete network | + +### Participants + +| Method | Path | Auth | Description | +|--------|------|------|-------------| +| `POST` | `/networks/{id}/participants` | JWT | Join network | +| `GET` | `/networks/{id}/participants` | JWT | List participants | +| `PATCH` | `/networks/{id}/participants/{pid}` | JWT | Update participant | +| `DELETE` | `/networks/{id}/participants/{pid}` | JWT | Leave network | + +### Communication + +| Method | Path | Auth | Description | +|--------|------|------|-------------| +| `POST` | `/networks/{id}/call` | JWT | Synchronous call (blocks 30s) | +| `POST` | `/networks/{id}/messages/send` | JWT | Async message (webhook + retry) | +| `POST` | `/networks/{id}/mailbox` | JWT | Mailbox (polling only) | +| `GET` | `/networks/{id}/inbox/{pid}` | JWT | Poll unread messages | +| `POST` | `/networks/{id}/messages/ack` | JWT | Acknowledge messages | +| `POST` | `/networks/{id}/participants/{pid}/callback?sig=...&exp=...` | HMAC | Callback (reply_url) | + +### Context & History + +| Method | Path | Auth | Description | +|--------|------|------|-------------| +| `GET` | `/networks/{id}/context` | JWT | Shared context (Redis, fast) | +| `GET` | `/networks/{id}/messages` | JWT | Full message history (Postgres) | + +### A2A Interoperability + +| Method | Path | Auth | Description | +|--------|------|------|-------------| +| `GET` | `/.well-known/agent.json` | None | Platform Agent Card | +| `POST` | `/a2a/agents/import` | JWT | Import external A2A agent | +| `POST` | `/a2a/agents/import/batch` | JWT | Import multiple A2A agents | +| `POST` | `/a2a/tasks/send` | JWT | A2A JSON-RPC task send | +| `GET` | `/a2a/agents/{id}/agent-card` | None | Per-agent A2A card | +| `GET` | `/a2a/agents/fetch-card?url=` | JWT | Preview card without importing | + +--- + +## Configuration + +Your network's behavior is controlled by platform settings: + +| Setting | Default | Description | +|---------|---------|-------------| +| `NETWORK_CALLBACK_TIMEOUT_SECONDS` | 30 | How long calls block waiting for a response | +| `NETWORK_MESSAGE_DELIVERY_MAX_RETRIES` | 3 | Retry attempts for failed webhook delivery | +| `NETWORK_CONTEXT_MAX_ENTRIES` | 500 | Max messages in the Redis context stream | +| `NETWORK_CONTEXT_TTL_SECONDS` | 604800 (7d) | How long context entries persist | +| `NETWORK_MAX_PARTICIPANTS` | 50 | Max participants per network | + +--- + +## What's Next + +For E2E testing patterns and reference agent implementations, see [E2E_TEST_SPEC.md](./E2E_TEST_SPEC.md). + +For internal architecture details, see [NETWORKS.md](./NETWORKS.md). + +For A2A protocol details, see [A2A.md](./A2A.md). diff --git a/docs/NETWORKS.md b/docs/NETWORKS.md index 868ba08..48189e3 100644 --- a/docs/NETWORKS.md +++ b/docs/NETWORKS.md @@ -40,7 +40,7 @@ Every message exchanged within a network is recorded and accumulated. When Intun ### The `reply_url` Pattern -When Intuno delivers any communication to an external agent, the payload includes a `reply_url`: +When Intuno delivers any communication to an external agent, the payload includes a signed `reply_url`: ```json { @@ -53,9 +53,9 @@ When Intuno delivers any communication to an external agent, the payload include }, "content": "Please review this draft...", "context": [ - {"sender": "User", "recipient": "Writer Agent", "channel": "message", "content": "...", "timestamp": 1711900000} + {"sender": "User", "recipient": "Writer Agent", "channel": "message", "content": "...", "message_id": "uuid", "timestamp": 1711900000} ], - "reply_url": "https://api.intuno.net/networks/{id}/participants/{pid}/callback", + "reply_url": "https://api.intuno.net/networks/{id}/participants/{pid}/callback?sig=abc123...&exp=1712000000", "network_participants": [ {"participant_id": "uuid", "name": "Writer Agent"}, {"participant_id": "uuid", "name": "Reviewer Agent"} @@ -63,7 +63,15 @@ When Intuno delivers any communication to an external agent, the payload include } ``` -The external agent can POST back to the `reply_url` to proactively send messages into the network. No authentication is required on the callback — the URL itself acts as a capability token. +The external agent can POST back to the `reply_url` to proactively send messages into the network. The callback URL is HMAC-SHA256 signed — the `sig` and `exp` query parameters are validated server-side. Signatures expire after 24 hours (configurable). The agent does not need to compute signatures; just POST to the URL as provided. + +### Security + +- **Callback authentication**: Reply URLs are HMAC-SHA256 signed (`src/network/utils/callback_auth.py`). Invalid or expired signatures are rejected with 403. +- **Ownership checks**: All channel operations verify the calling user owns the network. Authenticated users cannot send messages in networks they don't own. +- **SSRF protection**: Callback URLs are validated against private IP ranges (10.x, 172.16.x, 192.168.x, 127.x, 169.254.x, IPv6 loopback/ULA). HTTPS is required in production. +- **Content limits**: All message content is capped at 65,536 characters. +- **Topology enforcement**: Communication constraints (star hub-only, ring sequential) are enforced at the service layer — invalid sends return 400. --- @@ -138,14 +146,20 @@ Networks support four topology types that constrain communication patterns: 3. Agent A sends message to Agent B POST /networks/{id}/messages/send → Record in DB + Redis context - → POST to Agent B's callback_url (with context + reply_url) + → POST to Agent B's callback_url (with context + signed reply_url) + → If delivery fails → enqueue for retry via DeliveryWorker (Redis Streams) 4. Agent B responds proactively - POST /networks/{id}/participants/{B}/callback + POST /networks/{id}/participants/{B}/callback?sig=...&exp=... + → Verify HMAC signature (reject if invalid/expired) → Record in DB + Redis context → Forward to Agent A if targeted (with updated context) ``` +### Delivery Worker + +Failed webhook deliveries are automatically retried by a background worker (`src/network/utils/delivery_worker.py`) that consumes from a Redis Stream. Retries use exponential backoff (2, 4, 8 seconds) with a configurable maximum (default: 3 retries). The worker starts automatically with the application. + --- ## Workflow Integration: Loops and Aggregation @@ -207,27 +221,29 @@ src/network/ ├── __init__.py ├── models/ │ ├── entities.py # CommunicationNetwork, NetworkParticipant, NetworkMessage -│ └── schemas.py # Pydantic request/response schemas +│ └── schemas.py # Pydantic request/response schemas (Literal types, content limits) ├── repositories/ -│ └── networks.py # CRUD + context retrieval +│ └── networks.py # CRUD + context retrieval + get_inbox (unread/recipient-only) ├── services/ -│ ├── networks.py # Network management + message recording -│ └── channels.py # Calls, messages, mailboxes, callbacks +│ ├── networks.py # Network management + SSRF-safe URL validation on join +│ └── channels.py # Calls, messages, mailboxes, callbacks + ownership checks + topology ├── routes/ │ ├── networks.py # Network + participant + context endpoints │ ├── channels.py # Call/message/mailbox/inbox endpoints -│ └── callbacks.py # External agent callback receiver +│ └── callbacks.py # HMAC-authenticated callback receiver ├── utils/ -│ ├── context_manager.py # Redis Streams context accumulator -│ ├── delivery_worker.py # Background message delivery (Redis Streams consumer) -│ ├── topology.py # Topology validation and routing +│ ├── callback_auth.py # HMAC-SHA256 signed reply_url generation and verification +│ ├── url_validator.py # SSRF-safe URL validation (rejects private IPs) +│ ├── context_manager.py # Redis Streams context accumulator (includes message_id) +│ ├── delivery_worker.py # Background retry worker (Redis Streams consumer, exponential backoff) +│ ├── topology.py # Topology validation (mesh, star, ring, custom) │ ├── convergence.py # Convergence detectors for loops │ └── aggregator.py # Fan-in aggregation strategies └── a2a/ ├── agent_card.py # A2A Agent Card generation ├── protocol.py # A2A ↔ Intuno format translation - ├── discovery.py # Fetch + import external A2A agents - └── routes.py # A2A-compatible API endpoints + ├── discovery.py # Fetch + import external A2A agents (with URL validation) + └── routes.py # A2A-compatible API endpoints (session-safe via Depends) ``` --- diff --git a/src/main.py b/src/main.py index e67ef58..0f1fd46 100644 --- a/src/main.py +++ b/src/main.py @@ -180,9 +180,16 @@ async def lifespan(app: FastAPI): await scheduler.start() await event_consumer.start() + # Network delivery worker (retries failed webhook deliveries) + from src.network.utils.delivery_worker import DeliveryWorker + delivery_worker = DeliveryWorker(app.state.redis) + await delivery_worker.start(http_client=app.state.http_client) + app.state.delivery_worker = delivery_worker + yield # Shutdown + await delivery_worker.stop() await app.state.http_client.aclose() await event_consumer.stop() await scheduler.stop() diff --git a/src/network/a2a/discovery.py b/src/network/a2a/discovery.py index e5b6e95..5c6be35 100644 --- a/src/network/a2a/discovery.py +++ b/src/network/a2a/discovery.py @@ -50,7 +50,11 @@ async def fetch_agent_card(self, base_url: str) -> Optional[dict[str, Any]]: """Fetch an A2A Agent Card from a remote URL. Tries well-known paths if the URL doesn't point directly to a card. + Validates the URL to prevent SSRF attacks. """ + from src.network.utils.url_validator import validate_callback_url + validate_callback_url(base_url) + client = self._http_client owns_client = client is None if owns_client: diff --git a/src/network/a2a/routes.py b/src/network/a2a/routes.py index d16cb60..1e3ef53 100644 --- a/src/network/a2a/routes.py +++ b/src/network/a2a/routes.py @@ -68,6 +68,7 @@ async def a2a_task_send( data: A2ATaskSendRequest, request: Request, current_user: User = Depends(get_current_user), + discovery_service: "A2ADiscoveryService" = Depends(get_discovery_service), ) -> JSONResponse: """A2A-compatible task send endpoint. @@ -75,9 +76,7 @@ async def a2a_task_send( processes it, and returns the result in A2A format. """ from src.network.services.channels import ChannelService - from src.network.repositories.networks import NetworkRepository from src.network.utils.context_manager import NetworkContextManager - from src.database import get_redis # Safety check: reject if platform is in emergency halt from src.services.safety import check_platform_halt @@ -102,16 +101,16 @@ async def a2a_task_send( # Convert A2A task to Intuno message format intuno_msg = a2a_task_to_intuno_message(task_data) - # Process through the channel service - try: - redis = request.app.state.redis - repo = NetworkRepository( - session=(await request.app.state.db_session_factory()).__aenter__() - ) - ctx_manager = NetworkContextManager(redis) - channel_service = ChannelService(repo=repo, context_manager=ctx_manager) - channel_service.set_http_client(request.app.state.http_client) + # Use the request-scoped session from the discovery service dependency + from src.network.repositories.networks import NetworkRepository + + redis = request.app.state.redis + repo = NetworkRepository(session=discovery_service.registry_repository.session) + ctx_manager = NetworkContextManager(redis) + channel_service = ChannelService(repo=repo, context_manager=ctx_manager) + channel_service.set_http_client(request.app.state.http_client) + try: channel_type = intuno_msg.get("channel_type", "message") if channel_type == "call" and recipient_participant_id: @@ -121,8 +120,8 @@ async def a2a_task_send( recipient_participant_id=UUID(recipient_participant_id), content=intuno_msg["content"], metadata=intuno_msg.get("metadata"), + owner_id=current_user.id, ) - # Convert result back to A2A task format a2a_result = { "id": result.get("message_id"), "status": {"state": "completed"}, @@ -141,6 +140,7 @@ async def a2a_task_send( recipient_participant_id=UUID(recipient_participant_id), content=intuno_msg["content"], metadata=intuno_msg.get("metadata"), + owner_id=current_user.id, ) a2a_result = intuno_message_to_a2a_task( { @@ -155,10 +155,10 @@ async def a2a_task_send( return JSONResponse(build_a2a_json_rpc_response(a2a_result, data.id)) - except Exception as exc: + except (ValueError, KeyError) as exc: return JSONResponse( - build_a2a_json_rpc_error(-32603, str(exc), data.id), - status_code=500, + build_a2a_json_rpc_error(-32602, str(exc), data.id), + status_code=400, ) @@ -195,6 +195,7 @@ async def import_a2a_agent( data: A2AImportRequest, request: Request, current_user: User = Depends(get_current_user), + discovery_service: "A2ADiscoveryService" = Depends(get_discovery_service), ) -> JSONResponse: """Import an external A2A agent as a first-class Intuno agent. @@ -202,7 +203,6 @@ async def import_a2a_agent( generates embeddings, and indexes in Qdrant. The agent becomes fully discoverable and invocable — just like any natively registered agent. """ - discovery_service = await _get_discovery_service(request) discovery_service.set_http_client(request.app.state.http_client) try: @@ -230,9 +230,9 @@ async def import_a2a_agents_batch( data: A2ABatchImportRequest, request: Request, current_user: User = Depends(get_current_user), + discovery_service: "A2ADiscoveryService" = Depends(get_discovery_service), ) -> JSONResponse: """Import multiple external A2A agents in one request.""" - discovery_service = await _get_discovery_service(request) discovery_service.set_http_client(request.app.state.http_client) results = await discovery_service.import_multiple(data.urls, current_user.id) @@ -244,9 +244,9 @@ async def refresh_a2a_agent( agent_id: str, request: Request, current_user: User = Depends(get_current_user), + discovery_service: "A2ADiscoveryService" = Depends(get_discovery_service), ) -> JSONResponse: """Re-fetch the Agent Card and update the registry entry.""" - discovery_service = await _get_discovery_service(request) discovery_service.set_http_client(request.app.state.http_client) agent = await discovery_service.registry_repository.get_agent_by_agent_id(agent_id) @@ -277,9 +277,9 @@ async def fetch_agent_card_preview( url: str, request: Request, current_user: User = Depends(get_current_user), + discovery_service: "A2ADiscoveryService" = Depends(get_discovery_service), ) -> JSONResponse: """Preview an A2A Agent Card without importing it.""" - discovery_service = await _get_discovery_service(request) discovery_service.set_http_client(request.app.state.http_client) card = await discovery_service.fetch_agent_card(url) @@ -291,14 +291,19 @@ async def fetch_agent_card_preview( return JSONResponse({"success": True, "card": card}) -async def _get_discovery_service(request: Request): - """Helper to build a discovery service from request context.""" +def get_discovery_service( + registry: RegistryRepository = Depends(), +) -> "A2ADiscoveryService": + """FastAPI dependency that provides a properly-scoped discovery service. + + Uses the request-scoped DB session from Depends() to avoid connection + leaks (fixes: previous version created AsyncSessionLocal() without + closing it). + """ from src.network.a2a.discovery import A2ADiscoveryService - from src.database import AsyncSessionLocal from src.utilities.embedding import EmbeddingService - session = AsyncSessionLocal() return A2ADiscoveryService( - registry_repository=RegistryRepository(session=session), + registry_repository=registry, embedding_service=EmbeddingService(), ) diff --git a/src/network/models/schemas.py b/src/network/models/schemas.py index 5e4cf7e..4a7de9d 100644 --- a/src/network/models/schemas.py +++ b/src/network/models/schemas.py @@ -1,25 +1,37 @@ """Pydantic request/response schemas for communication networks.""" from datetime import datetime -from typing import Any, Optional +from typing import Any, Literal, Optional from uuid import UUID from pydantic import BaseModel, ConfigDict, Field +# ── Shared type aliases ───────────────────────────────────────────── + +TopologyLiteral = Literal["mesh", "star", "ring", "custom"] +ChannelLiteral = Literal["call", "message", "mailbox"] +NetworkStatusLiteral = Literal["active", "paused", "closed"] +ParticipantTypeLiteral = Literal["agent", "persona", "orchestrator"] +ParticipantStatusLiteral = Literal["active", "disconnected", "removed"] +MessageStatusLiteral = Literal["pending", "delivered", "read", "failed"] + +# Maximum content size for messages (64 KB) +MAX_CONTENT_LENGTH = 65536 + # ── Network schemas ────────────────────────────────────────────────── class NetworkCreate(BaseModel): name: str = Field(..., max_length=255) - topology_type: str = Field(default="mesh", description="mesh | star | ring | custom") + topology_type: TopologyLiteral = Field(default="mesh") metadata: Optional[dict[str, Any]] = None class NetworkUpdate(BaseModel): name: Optional[str] = Field(default=None, max_length=255) - topology_type: Optional[str] = None - status: Optional[str] = None + topology_type: Optional[TopologyLiteral] = None + status: Optional[NetworkStatusLiteral] = None metadata: Optional[dict[str, Any]] = None @@ -41,7 +53,7 @@ class NetworkResponse(BaseModel): class ParticipantJoin(BaseModel): agent_id: Optional[UUID] = None - participant_type: str = Field(default="agent", description="agent | persona | orchestrator") + participant_type: ParticipantTypeLiteral = Field(default="agent") name: str = Field(..., max_length=255) callback_url: Optional[str] = None polling_enabled: bool = False @@ -52,7 +64,7 @@ class ParticipantUpdate(BaseModel): callback_url: Optional[str] = None polling_enabled: Optional[bool] = None capabilities: Optional[dict[str, Any]] = None - status: Optional[str] = None + status: Optional[ParticipantStatusLiteral] = None class ParticipantResponse(BaseModel): @@ -76,8 +88,8 @@ class ParticipantResponse(BaseModel): class NetworkMessageCreate(BaseModel): recipient_participant_id: Optional[UUID] = None - channel_type: str = Field(..., description="call | message | mailbox") - content: str + channel_type: ChannelLiteral = Field(...) + content: str = Field(..., max_length=MAX_CONTENT_LENGTH) metadata: Optional[dict[str, Any]] = None in_reply_to_id: Optional[UUID] = None @@ -98,6 +110,29 @@ class NetworkMessageResponse(BaseModel): updated_at: datetime +# ── Channel request/response (shared by call, message, mailbox) ───── + + +class ChannelRequest(BaseModel): + """Shared request schema for all channel operations.""" + sender_participant_id: UUID + recipient_participant_id: UUID + content: str = Field(..., max_length=MAX_CONTENT_LENGTH) + metadata: Optional[dict[str, Any]] = None + + +class CallResponse(BaseModel): + """Response from a synchronous call.""" + success: bool + message_id: str + response: Any + + +class AckResponse(BaseModel): + """Response from message acknowledgment.""" + acknowledged: int + + # ── Context snapshot ───────────────────────────────────────────────── @@ -106,6 +141,7 @@ class ContextEntry(BaseModel): recipient: Optional[str] = None channel: str content: str + message_id: Optional[str] = None timestamp: datetime diff --git a/src/network/repositories/networks.py b/src/network/repositories/networks.py index 522dcdf..5970c25 100644 --- a/src/network/repositories/networks.py +++ b/src/network/repositories/networks.py @@ -176,6 +176,26 @@ async def get_context( messages.reverse() # chronological order return messages + async def get_inbox( + self, + network_id: UUID, + recipient_id: UUID, + limit: int = 50, + ) -> list[NetworkMessage]: + """Get unread messages where participant is the recipient.""" + q = ( + select(NetworkMessage) + .where( + NetworkMessage.network_id == network_id, + NetworkMessage.recipient_participant_id == recipient_id, + NetworkMessage.status.in_(["pending", "delivered"]), + ) + .order_by(NetworkMessage.created_at) + .limit(limit) + ) + result = await self.session.execute(q) + return list(result.scalars().all()) + async def update_message(self, message: NetworkMessage) -> NetworkMessage: await self.session.commit() await self.session.refresh(message) diff --git a/src/network/routes/callbacks.py b/src/network/routes/callbacks.py index 80d1814..fa6a598 100644 --- a/src/network/routes/callbacks.py +++ b/src/network/routes/callbacks.py @@ -2,18 +2,26 @@ This is the key endpoint that enables bidirectional communication. When Intuno delivers a message to an external agent, the payload includes -a ``reply_url`` pointing to this endpoint. The agent can POST back to -proactively send messages into the network. +a signed ``reply_url`` pointing to this endpoint. The agent can POST back +to proactively send messages into the network. + +The reply_url is HMAC-signed so only the intended recipient can use it. """ from typing import Any, Optional from uuid import UUID -from fastapi import APIRouter, Depends, Request +from fastapi import APIRouter, Depends, Query, Request from pydantic import BaseModel, Field -from src.network.models.schemas import NetworkMessageResponse +from src.exceptions import ForbiddenException +from src.network.models.schemas import ( + MAX_CONTENT_LENGTH, + ChannelLiteral, + NetworkMessageResponse, +) from src.network.services.channels import ChannelService +from src.network.utils.callback_auth import verify_callback_signature router = APIRouter(prefix="/networks", tags=["Callbacks"]) @@ -21,9 +29,9 @@ class CallbackPayload(BaseModel): """Payload an external agent sends to its reply_url.""" - content: str + content: str = Field(..., max_length=MAX_CONTENT_LENGTH) recipient_participant_id: Optional[UUID] = None - channel_type: str = Field(default="message", description="message | call | mailbox") + channel_type: ChannelLiteral = Field(default="message") metadata: Optional[dict[str, Any]] = None in_reply_to_id: Optional[UUID] = None @@ -37,19 +45,23 @@ async def receive_callback( participant_id: UUID, data: CallbackPayload, request: Request, + sig: str = Query(..., description="HMAC signature from the signed reply_url"), + exp: str = Query(..., description="Expiry timestamp from the signed reply_url"), service: ChannelService = Depends(), ) -> NetworkMessageResponse: """Receive a proactive message from an external agent. - No authentication required — the reply_url itself acts as a capability - token. The participant_id in the URL identifies the sender. + Authentication is via the HMAC-signed reply_url — the ``sig`` and + ``exp`` query parameters are validated before processing. The external agent can: - Reply to a specific message (in_reply_to_id) - Target a specific recipient (recipient_participant_id) - - Broadcast to the network (omit recipient_participant_id) - Choose a channel type (call/message/mailbox) """ + if not verify_callback_signature(network_id, participant_id, sig, exp): + raise ForbiddenException("Invalid or expired callback signature") + service.set_http_client(request.app.state.http_client) return await service.handle_callback( network_id=network_id, diff --git a/src/network/routes/channels.py b/src/network/routes/channels.py index 511dadd..f94073a 100644 --- a/src/network/routes/channels.py +++ b/src/network/routes/channels.py @@ -1,43 +1,25 @@ """Channel routes: calls, messages, mailboxes, inbox, and acknowledgment.""" -from typing import Any, List, Optional +from typing import List, Optional from uuid import UUID from fastapi import APIRouter, Depends, Query, Request, status -from pydantic import BaseModel, Field +from pydantic import BaseModel from src.core.auth import get_current_user from src.models.auth import User -from src.network.models.schemas import NetworkMessageResponse +from src.network.models.schemas import ( + AckResponse, + CallResponse, + ChannelLiteral, + ChannelRequest, + NetworkMessageResponse, +) from src.network.services.channels import ChannelService router = APIRouter(prefix="/networks", tags=["Channels"]) -# ── Request schemas ────────────────────────────────────────────────── - - -class CallRequest(BaseModel): - sender_participant_id: UUID - recipient_participant_id: UUID - content: str - metadata: Optional[dict[str, Any]] = None - - -class MessageRequest(BaseModel): - sender_participant_id: UUID - recipient_participant_id: UUID - content: str - metadata: Optional[dict[str, Any]] = None - - -class MailboxRequest(BaseModel): - sender_participant_id: UUID - recipient_participant_id: UUID - content: str - metadata: Optional[dict[str, Any]] = None - - class AckRequest(BaseModel): message_ids: list[UUID] @@ -45,14 +27,14 @@ class AckRequest(BaseModel): # ── Call ───────────────────────────────────────────────────────────── -@router.post("/{network_id}/call") +@router.post("/{network_id}/call", response_model=CallResponse) async def make_call( network_id: UUID, - data: CallRequest, + data: ChannelRequest, request: Request, current_user: User = Depends(get_current_user), service: ChannelService = Depends(), -) -> dict: +) -> CallResponse: """Synchronous call to another participant. Blocks until response.""" service.set_http_client(request.app.state.http_client) return await service.call( @@ -61,6 +43,7 @@ async def make_call( recipient_participant_id=data.recipient_participant_id, content=data.content, metadata=data.metadata, + owner_id=current_user.id, ) @@ -74,7 +57,7 @@ async def make_call( ) async def send_message( network_id: UUID, - data: MessageRequest, + data: ChannelRequest, request: Request, current_user: User = Depends(get_current_user), service: ChannelService = Depends(), @@ -87,6 +70,7 @@ async def send_message( recipient_participant_id=data.recipient_participant_id, content=data.content, metadata=data.metadata, + owner_id=current_user.id, ) @@ -100,7 +84,7 @@ async def send_message( ) async def send_to_mailbox( network_id: UUID, - data: MailboxRequest, + data: ChannelRequest, current_user: User = Depends(get_current_user), service: ChannelService = Depends(), ) -> NetworkMessageResponse: @@ -111,6 +95,7 @@ async def send_to_mailbox( recipient_participant_id=data.recipient_participant_id, content=data.content, metadata=data.metadata, + owner_id=current_user.id, ) @@ -122,17 +107,18 @@ async def get_inbox( network_id: UUID, participant_id: UUID, current_user: User = Depends(get_current_user), - channel_type: Optional[str] = Query(default=None), + channel_type: Optional[ChannelLiteral] = Query(default=None), limit: int = Query(default=50, ge=1, le=200), service: ChannelService = Depends(), ) -> List[NetworkMessageResponse]: - """Poll inbox for a participant.""" + """Poll inbox for a participant. Returns unread messages only.""" channel_types = [channel_type] if channel_type else None messages = await service.get_inbox( network_id=network_id, participant_id=participant_id, channel_types=channel_types, limit=limit, + owner_id=current_user.id, ) return messages @@ -140,13 +126,15 @@ async def get_inbox( # ── Acknowledge ────────────────────────────────────────────────────── -@router.post("/{network_id}/messages/ack") +@router.post("/{network_id}/messages/ack", response_model=AckResponse) async def acknowledge_messages( network_id: UUID, data: AckRequest, current_user: User = Depends(get_current_user), service: ChannelService = Depends(), -) -> dict: +) -> AckResponse: """Mark messages as read.""" - count = await service.acknowledge(network_id, data.message_ids) - return {"acknowledged": count} + count = await service.acknowledge( + network_id, data.message_ids, owner_id=current_user.id + ) + return AckResponse(acknowledged=count) diff --git a/src/network/services/channels.py b/src/network/services/channels.py index 09a4025..300fbe9 100644 --- a/src/network/services/channels.py +++ b/src/network/services/channels.py @@ -7,7 +7,6 @@ import json import logging -import time from typing import Any, Optional from uuid import UUID @@ -15,7 +14,7 @@ from fastapi import Depends from src.core.settings import settings -from src.exceptions import BadRequestException, NotFoundException +from src.exceptions import BadRequestException, ForbiddenException, NotFoundException from src.network.models.entities import ( ChannelType, CommunicationNetwork, @@ -27,7 +26,9 @@ ) from src.network.models.schemas import NetworkMessageCreate from src.network.repositories.networks import NetworkRepository +from src.network.utils.callback_auth import sign_callback_url from src.network.utils.context_manager import NetworkContextManager +from src.network.utils.topology import TopologyValidator logger = logging.getLogger(__name__) @@ -43,6 +44,7 @@ def __init__( self.repo = repo self.ctx = context_manager self._http_client: Optional[httpx.AsyncClient] = None + self._topology = TopologyValidator() def set_http_client(self, client: httpx.AsyncClient) -> None: self._http_client = client @@ -56,13 +58,15 @@ async def call( recipient_participant_id: UUID, content: str, metadata: Optional[dict[str, Any]] = None, + owner_id: Optional[UUID] = None, ) -> dict[str, Any]: """Synchronous call: send payload to recipient, wait for response. Returns a dict with the call result including the recipient's response. """ sender, recipient, network = await self._validate_communication( - network_id, sender_participant_id, recipient_participant_id + network_id, sender_participant_id, recipient_participant_id, + owner_id=owner_id, ) if not recipient.callback_url: @@ -84,7 +88,7 @@ async def call( context_entries = await self.ctx.get_context_window(network_id, limit=30) participants = await self.repo.list_participants(network_id) - # Build the payload with reply_url + # Build the payload with signed reply_url payload = self._build_delivery_payload( network_id=network_id, sender=sender, @@ -135,6 +139,7 @@ async def send_message( recipient_participant_id: UUID, content: str, metadata: Optional[dict[str, Any]] = None, + owner_id: Optional[UUID] = None, ) -> NetworkMessage: """Non-blocking message: record and push via webhook. @@ -142,9 +147,11 @@ async def send_message( is best-effort with retries handled by the delivery worker. """ sender, recipient, network = await self._validate_communication( - network_id, sender_participant_id, recipient_participant_id + network_id, sender_participant_id, recipient_participant_id, + owner_id=owner_id, ) + # Record message and attempt delivery in one logical operation message = await self._record_message( network_id=network_id, sender=sender, @@ -177,10 +184,14 @@ async def send_message( message.status = MessageStatus.delivered except Exception: logger.warning( - "Message delivery failed for participant %s; will retry", + "Message delivery failed for participant %s; enqueuing for retry", recipient_participant_id, ) message.status = MessageStatus.pending + # Enqueue for retry via delivery worker + await self._enqueue_delivery( + recipient.callback_url, payload, str(message.id) + ) await self.repo.update_message(message) return message @@ -194,10 +205,12 @@ async def send_to_mailbox( recipient_participant_id: UUID, content: str, metadata: Optional[dict[str, Any]] = None, + owner_id: Optional[UUID] = None, ) -> NetworkMessage: """Async mailbox: store message, no push delivery.""" sender, recipient, network = await self._validate_communication( - network_id, sender_participant_id, recipient_participant_id + network_id, sender_participant_id, recipient_participant_id, + owner_id=owner_id, ) return await self._record_message( @@ -217,21 +230,35 @@ async def get_inbox( participant_id: UUID, channel_types: Optional[list[str]] = None, limit: int = 50, + owner_id: Optional[UUID] = None, ) -> list[NetworkMessage]: - """Get unread messages for a participant.""" - messages = await self.repo.list_messages( + """Get unread messages for a participant (recipient only).""" + # Verify ownership + if owner_id: + network = await self.repo.get_network(network_id) + if not network or network.owner_id != owner_id: + raise ForbiddenException("You don't own this network") + + messages = await self.repo.get_inbox( network_id=network_id, + recipient_id=participant_id, limit=limit, - participant_id=participant_id, ) if channel_types: messages = [m for m in messages if m.channel_type in channel_types] return messages async def acknowledge( - self, network_id: UUID, message_ids: list[UUID] + self, network_id: UUID, message_ids: list[UUID], + owner_id: Optional[UUID] = None, ) -> int: """Mark messages as read.""" + # Verify ownership + if owner_id: + network = await self.repo.get_network(network_id) + if not network or network.owner_id != owner_id: + raise ForbiddenException("You don't own this network") + count = 0 for msg_id in message_ids: message = await self.repo.get_message(msg_id) @@ -256,7 +283,7 @@ async def handle_callback( """Handle a proactive message from an external agent via callback URL. This is the key to bidirectionality: external agents POST to their - reply_url and this method records the message in the network. + signed reply_url and this method records the message in the network. """ # Safety check: reject if platform is in emergency halt from src.services.safety import check_platform_halt @@ -290,31 +317,11 @@ async def handle_callback( # If there's a specific recipient with a callback_url, forward the message if recipient and recipient.callback_url and channel_type == "message": - context_entries = await self.ctx.get_context_window(network_id, limit=20) - participants = await self.repo.list_participants(network_id) - payload = self._build_delivery_payload( - network_id=network_id, - sender=sender, - recipient=recipient, - channel=channel_type, - content=content, - context=context_entries, - participants=participants, - message_id=message.id, + await self._forward_to_participant( + network_id, sender, recipient, channel_type, content, message.id ) - try: - await self._deliver_http( - recipient.callback_url, - payload, - timeout=settings.NETWORK_CALLBACK_TIMEOUT_SECONDS, - ) - message.status = MessageStatus.delivered - await self.repo.update_message(message) - except Exception: - logger.warning( - "Forwarding callback message failed for participant %s", - recipient_participant_id, - ) + message.status = MessageStatus.delivered + await self.repo.update_message(message) return message @@ -325,6 +332,7 @@ async def _validate_communication( network_id: UUID, sender_id: UUID, recipient_id: UUID, + owner_id: Optional[UUID] = None, ) -> tuple[NetworkParticipant, NetworkParticipant, CommunicationNetwork]: # Safety check: reject if platform is in emergency halt from src.services.safety import check_agent_active, check_platform_halt @@ -336,6 +344,10 @@ async def _validate_communication( if network.status != NetworkStatus.active: raise BadRequestException("Network is not active") + # Ownership check: verify the calling user owns this network + if owner_id and network.owner_id != owner_id: + raise ForbiddenException("You don't own this network") + sender = await self.repo.get_participant(sender_id) if not sender or sender.network_id != network_id: raise NotFoundException("Sender participant") @@ -354,6 +366,10 @@ async def _validate_communication( if recipient.agent_id: await check_agent_active(recipient.agent_id) + # Topology validation: enforce communication constraints + participants = await self.repo.list_participants(network_id) + self._topology.validate(network, sender, recipient, participants) + return sender, recipient, network async def _record_message( @@ -401,6 +417,15 @@ def _build_delivery_payload( message_id: UUID, ) -> dict[str, Any]: """Build the standard payload delivered to external agents.""" + # Build the signed reply_url + raw_reply_url = ( + f"{settings.BASE_URL}/networks/{network_id}" + f"/participants/{recipient.id}/callback" + ) + signed_reply_url = sign_callback_url( + raw_reply_url, network_id, recipient.id + ) + return { "network_id": str(network_id), "message_id": str(message_id), @@ -411,10 +436,7 @@ def _build_delivery_payload( }, "content": content, "context": context, - "reply_url": ( - f"{settings.BASE_URL}/networks/{network_id}" - f"/participants/{recipient.id}/callback" - ), + "reply_url": signed_reply_url, "network_participants": [ {"participant_id": str(p.id), "name": p.name} for p in participants @@ -422,6 +444,58 @@ def _build_delivery_payload( ], } + async def _forward_to_participant( + self, + network_id: UUID, + sender: NetworkParticipant, + recipient: NetworkParticipant, + channel_type: str, + content: str, + message_id: UUID, + ) -> None: + """Forward a message to a participant with a callback_url.""" + context_entries = await self.ctx.get_context_window(network_id, limit=20) + participants = await self.repo.list_participants(network_id) + payload = self._build_delivery_payload( + network_id=network_id, + sender=sender, + recipient=recipient, + channel=channel_type, + content=content, + context=context_entries, + participants=participants, + message_id=message_id, + ) + try: + await self._deliver_http( + recipient.callback_url, + payload, + timeout=settings.NETWORK_CALLBACK_TIMEOUT_SECONDS, + ) + except Exception: + logger.warning( + "Forwarding callback message failed for participant %s", + recipient.id, + ) + + async def _enqueue_delivery( + self, callback_url: str, payload: dict, message_id: str + ) -> None: + """Enqueue a failed delivery for retry via the delivery worker.""" + try: + from src.network.utils.delivery_worker import DeliveryWorker + redis = self.ctx._redis + await DeliveryWorker.enqueue( + redis, + callback_url=callback_url, + payload=payload, + message_id=message_id, + ) + except Exception: + logger.warning( + "Failed to enqueue delivery for retry (message %s)", message_id + ) + async def _deliver_http( self, url: str, diff --git a/src/network/services/networks.py b/src/network/services/networks.py index 2cf4321..fa49dc8 100644 --- a/src/network/services/networks.py +++ b/src/network/services/networks.py @@ -96,6 +96,9 @@ async def join_network( raise BadRequestException( "Participant must have a callback_url or polling_enabled" ) + if data.callback_url: + from src.network.utils.url_validator import validate_callback_url + validate_callback_url(data.callback_url) if data.agent_id: existing = await self.repo.get_participant_by_agent(network_id, data.agent_id) if existing: diff --git a/src/network/utils/callback_auth.py b/src/network/utils/callback_auth.py new file mode 100644 index 0000000..91b8a84 --- /dev/null +++ b/src/network/utils/callback_auth.py @@ -0,0 +1,85 @@ +"""HMAC-signed callback URLs for authenticated bidirectional communication. + +When delivering a message to an external agent, the reply_url is signed +with an HMAC so that only the intended recipient can POST back. This +prevents attackers from injecting messages by guessing participant IDs. + +Signature: HMAC-SHA256(network_id + participant_id + expiry, secret) +""" + +import hashlib +import hmac +import time +from urllib.parse import parse_qs, urlencode, urlparse, urlunparse +from uuid import UUID + +from src.core.settings import settings + +# Default callback URL expiry: 24 hours +CALLBACK_EXPIRY_SECONDS = 86400 + + +def sign_callback_url( + base_url: str, + network_id: UUID, + participant_id: UUID, + secret: str | None = None, + expiry_seconds: int = CALLBACK_EXPIRY_SECONDS, +) -> str: + """Append HMAC signature and expiry to a callback URL. + + Returns the URL with ?sig=&exp= appended. + """ + secret = secret or settings.JWT_SECRET_KEY + exp = int(time.time()) + expiry_seconds + + sig = _compute_signature(network_id, participant_id, exp, secret) + + parsed = urlparse(base_url) + params = parse_qs(parsed.query) + params["sig"] = [sig] + params["exp"] = [str(exp)] + new_query = urlencode(params, doseq=True) + signed_url = urlunparse(parsed._replace(query=new_query)) + return signed_url + + +def verify_callback_signature( + network_id: UUID, + participant_id: UUID, + sig: str, + exp: str, + secret: str | None = None, +) -> bool: + """Verify an HMAC signature from a callback URL. + + Returns True if the signature is valid and not expired. + """ + secret = secret or settings.JWT_SECRET_KEY + + # Check expiry + try: + exp_ts = int(exp) + except (ValueError, TypeError): + return False + + if time.time() > exp_ts: + return False + + expected = _compute_signature(network_id, participant_id, exp_ts, secret) + return hmac.compare_digest(sig, expected) + + +def _compute_signature( + network_id: UUID, + participant_id: UUID, + exp: int, + secret: str, +) -> str: + """Compute HMAC-SHA256 signature for callback authentication.""" + message = f"{network_id}:{participant_id}:{exp}" + return hmac.new( + secret.encode("utf-8"), + message.encode("utf-8"), + hashlib.sha256, + ).hexdigest() diff --git a/src/network/utils/context_manager.py b/src/network/utils/context_manager.py index 8b880f3..a54a4b9 100644 --- a/src/network/utils/context_manager.py +++ b/src/network/utils/context_manager.py @@ -66,6 +66,7 @@ async def get_context_window( "recipient": data["recipient"] or None, "channel": data["channel"], "content": data["content"], + "message_id": data.get("message_id") or None, "timestamp": float(data["ts"]), } ) diff --git a/src/network/utils/delivery_worker.py b/src/network/utils/delivery_worker.py index 1e1c880..c5f3028 100644 --- a/src/network/utils/delivery_worker.py +++ b/src/network/utils/delivery_worker.py @@ -10,6 +10,7 @@ import asyncio import json import logging +import time from typing import Any import httpx @@ -61,6 +62,7 @@ async def enqueue( payload: dict[str, Any], message_id: str, attempt: int = 0, + deliver_after: float = 0, ) -> None: """Enqueue a delivery task into the Redis Stream.""" await redis.xadd( @@ -70,6 +72,7 @@ async def enqueue( "payload": json.dumps(payload, default=str), "message_id": message_id, "attempt": str(attempt), + "deliver_after": str(deliver_after), }, ) @@ -99,6 +102,14 @@ async def _deliver(self, stream_id: str, data: dict[str, str]) -> None: payload_raw = data.get("payload", "{}") message_id = data.get("message_id", "") attempt = int(data.get("attempt", "0")) + deliver_after = float(data.get("deliver_after", "0")) + + # Respect delayed delivery (for exponential backoff retries) + if deliver_after > 0: + now = time.time() + if now < deliver_after: + delay = min(deliver_after - now, 60) + await asyncio.sleep(delay) try: payload = json.loads(payload_raw) @@ -110,6 +121,7 @@ async def _deliver(self, stream_id: str, data: dict[str, str]) -> None: ) owns_client = self._http_client is None + delivered = False try: response = await client.post( callback_url, @@ -122,6 +134,7 @@ async def _deliver(self, stream_id: str, data: dict[str, str]) -> None: ) if response.status_code == 200: logger.debug("Delivered message %s to %s", message_id, callback_url) + delivered = True else: logger.warning( "Delivery to %s returned %d for message %s", @@ -129,7 +142,6 @@ async def _deliver(self, stream_id: str, data: dict[str, str]) -> None: response.status_code, message_id, ) - await self._maybe_retry(data, attempt) except Exception: logger.warning( "Delivery to %s failed for message %s (attempt %d)", @@ -137,21 +149,47 @@ async def _deliver(self, stream_id: str, data: dict[str, str]) -> None: message_id, attempt, ) - await self._maybe_retry(data, attempt) finally: if owns_client: await client.aclose() - await self._redis.xack(STREAM_KEY, CONSUMER_GROUP, stream_id) + if delivered: + # ACK only on successful delivery + await self._redis.xack(STREAM_KEY, CONSUMER_GROUP, stream_id) + else: + # Attempt retry with exponential backoff + retried = await self._maybe_retry(data, attempt) + if retried: + # Re-enqueue succeeded — ACK the original to prevent duplicate + await self._redis.xack(STREAM_KEY, CONSUMER_GROUP, stream_id) + # If retry failed, don't ACK — consumer group will redeliver + + async def _maybe_retry(self, data: dict[str, str], attempt: int) -> bool: + """Re-enqueue for retry with exponential backoff. Returns True on success.""" + if attempt >= settings.NETWORK_MESSAGE_DELIVERY_MAX_RETRIES: + logger.error( + "Max retries reached for message %s to %s", + data.get("message_id", ""), + data.get("callback_url", ""), + ) + return False + + backoff = 2 ** (attempt + 1) + deliver_at = time.time() + backoff - async def _maybe_retry(self, data: dict[str, str], attempt: int) -> None: - if attempt < settings.NETWORK_MESSAGE_DELIVERY_MAX_RETRIES: - backoff = 2 ** (attempt + 1) - await asyncio.sleep(backoff) + try: await self.enqueue( self._redis, callback_url=data.get("callback_url", ""), payload=json.loads(data.get("payload", "{}")), message_id=data.get("message_id", ""), attempt=attempt + 1, + deliver_after=deliver_at, + ) + return True + except Exception: + logger.warning( + "Failed to re-enqueue delivery for message %s", + data.get("message_id", ""), ) + return False diff --git a/src/network/utils/url_validator.py b/src/network/utils/url_validator.py new file mode 100644 index 0000000..f51d128 --- /dev/null +++ b/src/network/utils/url_validator.py @@ -0,0 +1,89 @@ +"""SSRF-safe URL validation for callback URLs and external fetches. + +Rejects private/internal IP ranges and validates URL format to prevent +Server-Side Request Forgery attacks when the platform makes outbound +HTTP requests to user-supplied URLs. +""" + +import ipaddress +import logging +import socket +from urllib.parse import urlparse + +from src.core.settings import settings +from src.exceptions import BadRequestException + +logger = logging.getLogger(__name__) + +# Private and internal IP networks that should never be targeted +_BLOCKED_NETWORKS = [ + ipaddress.ip_network("0.0.0.0/8"), + ipaddress.ip_network("10.0.0.0/8"), + ipaddress.ip_network("100.64.0.0/10"), + ipaddress.ip_network("127.0.0.0/8"), + ipaddress.ip_network("169.254.0.0/16"), + ipaddress.ip_network("172.16.0.0/12"), + ipaddress.ip_network("192.0.0.0/24"), + ipaddress.ip_network("192.168.0.0/16"), + ipaddress.ip_network("198.18.0.0/15"), + ipaddress.ip_network("::1/128"), + ipaddress.ip_network("fc00::/7"), + ipaddress.ip_network("fe80::/10"), +] + + +def _is_private_ip(hostname: str) -> bool: + """Check if a hostname resolves to a private/internal IP.""" + try: + addr = ipaddress.ip_address(hostname) + return any(addr in net for net in _BLOCKED_NETWORKS) + except ValueError: + # Not an IP literal — resolve the hostname + pass + + try: + for info in socket.getaddrinfo(hostname, None, proto=socket.IPPROTO_TCP): + addr = ipaddress.ip_address(info[4][0]) + if any(addr in net for net in _BLOCKED_NETWORKS): + return True + except (socket.gaierror, OSError): + # DNS resolution failed — reject as suspicious + return True + + return False + + +def validate_callback_url(url: str) -> str: + """Validate a callback URL is safe for the platform to POST to. + + Returns the validated URL string. + Raises BadRequestException if the URL is unsafe. + """ + if not url or not url.strip(): + raise BadRequestException("Callback URL cannot be empty") + + parsed = urlparse(url) + + # Must be HTTP or HTTPS + if parsed.scheme not in ("http", "https"): + raise BadRequestException( + f"Callback URL must use http or https scheme, got '{parsed.scheme}'" + ) + + # Require HTTPS in production + if settings.ENVIRONMENT != "development" and parsed.scheme != "https": + raise BadRequestException( + "Callback URL must use HTTPS in production" + ) + + # Must have a hostname + if not parsed.hostname: + raise BadRequestException("Callback URL must include a hostname") + + # Reject private/internal IPs + if _is_private_ip(parsed.hostname): + raise BadRequestException( + "Callback URL must not target private or internal networks" + ) + + return url diff --git a/tests/test_networks.py b/tests/test_networks.py index b4c0380..3a56882 100644 --- a/tests/test_networks.py +++ b/tests/test_networks.py @@ -19,10 +19,18 @@ import httpx import pytest +from src.network.utils.callback_auth import sign_callback_url + BASE_URL = os.getenv("TEST_BASE_URL", "http://localhost:8000") TIMEOUT = 15 +def _signed_callback_url(network_id: str, participant_id: str) -> str: + """Generate a signed callback URL for testing.""" + raw = f"{BASE_URL}/networks/{network_id}/participants/{participant_id}/callback" + return sign_callback_url(raw, uuid.UUID(network_id), uuid.UUID(participant_id)) + + # ── Helpers ────────────────────────────────────────────────────────── @@ -294,9 +302,10 @@ async def test_callback_bidirectional(authed): assert resp.status_code == 201 original_msg_id = resp.json()["id"] - # Agent B responds proactively via callback (no auth required) + # Agent B responds proactively via signed callback URL + callback_url = _signed_callback_url(network_id, agent_b_id) resp = await client.post( - f"{BASE_URL}/networks/{network_id}/participants/{agent_b_id}/callback", + callback_url, json={ "content": "Analysis complete. Found 3 anomalies.", "recipient_participant_id": agent_a_id, @@ -370,9 +379,10 @@ async def test_multi_participant_context(authed): }, ) - # C → A (proactive via callback) + # C → A (proactive via signed callback) + callback_url = _signed_callback_url(network_id, participant_ids["Persona C"]) await client.post( - f"{BASE_URL}/networks/{network_id}/participants/{participant_ids['Persona C']}/callback", + callback_url, json={ "content": "Thanks for including me! I have some ideas to share.", "recipient_participant_id": participant_ids["Persona A"], diff --git a/tests/unit/__init__.py b/tests/unit/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/unit/test_aggregator.py b/tests/unit/test_aggregator.py new file mode 100644 index 0000000..6bef797 --- /dev/null +++ b/tests/unit/test_aggregator.py @@ -0,0 +1,110 @@ +"""Unit tests for aggregation strategies.""" + +import pytest + +from src.network.utils.aggregator import ( + LLMSummarizeAggregator, + MergeAggregator, + VoteAggregator, + create_aggregator, +) + + +class TestMergeAggregator: + @pytest.mark.asyncio + async def test_merges_by_source(self): + agg = MergeAggregator() + result = await agg.aggregate([ + {"source": "agent_1", "output": "Result A"}, + {"source": "agent_2", "output": "Result B"}, + ]) + assert result["strategy"] == "merge" + assert result["result"]["agent_1"] == "Result A" + assert result["result"]["agent_2"] == "Result B" + + @pytest.mark.asyncio + async def test_auto_keys_without_source(self): + agg = MergeAggregator() + result = await agg.aggregate([ + {"output": "Result A"}, + {"output": "Result B"}, + ]) + assert result["strategy"] == "merge" + assert len(result["result"]) == 2 + + @pytest.mark.asyncio + async def test_empty_inputs(self): + agg = MergeAggregator() + result = await agg.aggregate([]) + assert result["strategy"] == "merge" + assert result["result"] == {} + + +class TestVoteAggregator: + @pytest.mark.asyncio + async def test_majority_wins(self): + agg = VoteAggregator() + result = await agg.aggregate([ + {"output": "yes"}, + {"output": "yes"}, + {"output": "no"}, + ]) + assert result["strategy"] == "vote" + assert result["result"] == "yes" + assert result["total"] == 3 + + @pytest.mark.asyncio + async def test_tie_picks_one(self): + agg = VoteAggregator() + result = await agg.aggregate([ + {"output": "a"}, + {"output": "b"}, + ]) + assert result["strategy"] == "vote" + assert result["result"] in ("a", "b") + + @pytest.mark.asyncio + async def test_empty_inputs(self): + agg = VoteAggregator() + result = await agg.aggregate([]) + assert result["strategy"] == "vote" + assert result["result"] is None + + @pytest.mark.asyncio + async def test_none_outputs(self): + agg = VoteAggregator() + result = await agg.aggregate([ + {"output": None}, + {"output": None}, + ]) + assert result["strategy"] == "vote" + assert result["result"] == "null" + + +class TestLLMSummarizeAggregator: + @pytest.mark.asyncio + async def test_falls_back_to_merge_on_error(self): + """Without a real OpenAI key, should fall back to merge.""" + agg = LLMSummarizeAggregator() + result = await agg.aggregate([ + {"source": "agent_1", "output": "Result A"}, + {"source": "agent_2", "output": "Result B"}, + ]) + # Should fall back since no API key is set in tests + assert result["strategy"] == "llm_summarize_fallback" + assert "result" in result + + +class TestCreateAggregator: + def test_creates_merge(self): + assert isinstance(create_aggregator("merge"), MergeAggregator) + + def test_creates_vote(self): + assert isinstance(create_aggregator("vote"), VoteAggregator) + + def test_creates_llm_summarize(self): + assert isinstance(create_aggregator("llm_summarize"), LLMSummarizeAggregator) + + def test_unknown_strategy_raises(self): + with pytest.raises(ValueError, match="Unknown"): + create_aggregator("nonexistent") diff --git a/tests/unit/test_callback_auth.py b/tests/unit/test_callback_auth.py new file mode 100644 index 0000000..4c85493 --- /dev/null +++ b/tests/unit/test_callback_auth.py @@ -0,0 +1,124 @@ +"""Unit tests for HMAC-signed callback URL authentication.""" + +import time +from unittest.mock import patch +from uuid import uuid4 + +import pytest + +from src.network.utils.callback_auth import ( + CALLBACK_EXPIRY_SECONDS, + sign_callback_url, + verify_callback_signature, +) + + +@pytest.fixture +def network_id(): + return uuid4() + + +@pytest.fixture +def participant_id(): + return uuid4() + + +SECRET = "test-secret-key-for-unit-tests" + + +class TestSignCallbackUrl: + def test_appends_sig_and_exp(self, network_id, participant_id): + url = f"https://api.intuno.ai/networks/{network_id}/participants/{participant_id}/callback" + signed = sign_callback_url(url, network_id, participant_id, secret=SECRET) + assert "sig=" in signed + assert "exp=" in signed + + def test_preserves_base_url(self, network_id, participant_id): + base = "https://api.intuno.ai/networks/test/callback" + signed = sign_callback_url(base, network_id, participant_id, secret=SECRET) + assert signed.startswith("https://api.intuno.ai/networks/test/callback?") + + +class TestVerifyCallbackSignature: + def test_valid_signature(self, network_id, participant_id): + url = "https://example.com/callback" + signed = sign_callback_url(url, network_id, participant_id, secret=SECRET) + # Extract sig and exp from the signed URL + from urllib.parse import parse_qs, urlparse + parsed = urlparse(signed) + params = parse_qs(parsed.query) + sig = params["sig"][0] + exp = params["exp"][0] + + assert verify_callback_signature( + network_id, participant_id, sig, exp, secret=SECRET + ) + + def test_wrong_network_id(self, network_id, participant_id): + url = "https://example.com/callback" + signed = sign_callback_url(url, network_id, participant_id, secret=SECRET) + from urllib.parse import parse_qs, urlparse + parsed = urlparse(signed) + params = parse_qs(parsed.query) + + wrong_network = uuid4() + assert not verify_callback_signature( + wrong_network, participant_id, + params["sig"][0], params["exp"][0], + secret=SECRET, + ) + + def test_wrong_participant_id(self, network_id, participant_id): + url = "https://example.com/callback" + signed = sign_callback_url(url, network_id, participant_id, secret=SECRET) + from urllib.parse import parse_qs, urlparse + parsed = urlparse(signed) + params = parse_qs(parsed.query) + + wrong_participant = uuid4() + assert not verify_callback_signature( + network_id, wrong_participant, + params["sig"][0], params["exp"][0], + secret=SECRET, + ) + + def test_expired_signature(self, network_id, participant_id): + url = "https://example.com/callback" + # Sign with 0 expiry (already expired) + signed = sign_callback_url( + url, network_id, participant_id, secret=SECRET, expiry_seconds=-1 + ) + from urllib.parse import parse_qs, urlparse + parsed = urlparse(signed) + params = parse_qs(parsed.query) + + assert not verify_callback_signature( + network_id, participant_id, + params["sig"][0], params["exp"][0], + secret=SECRET, + ) + + def test_tampered_signature(self, network_id, participant_id): + assert not verify_callback_signature( + network_id, participant_id, + "tampered_signature", str(int(time.time()) + 3600), + secret=SECRET, + ) + + def test_invalid_exp_format(self, network_id, participant_id): + assert not verify_callback_signature( + network_id, participant_id, "some_sig", "not_a_number", secret=SECRET + ) + + def test_wrong_secret(self, network_id, participant_id): + url = "https://example.com/callback" + signed = sign_callback_url(url, network_id, participant_id, secret=SECRET) + from urllib.parse import parse_qs, urlparse + parsed = urlparse(signed) + params = parse_qs(parsed.query) + + assert not verify_callback_signature( + network_id, participant_id, + params["sig"][0], params["exp"][0], + secret="wrong-secret", + ) diff --git a/tests/unit/test_convergence.py b/tests/unit/test_convergence.py new file mode 100644 index 0000000..91b0b52 --- /dev/null +++ b/tests/unit/test_convergence.py @@ -0,0 +1,119 @@ +"""Unit tests for convergence detectors.""" + +import pytest + +from src.network.utils.convergence import ( + ApprovalDetector, + MaxIterationsDetector, + SimilarityDetector, + create_detector, +) + + +class TestMaxIterationsDetector: + @pytest.mark.asyncio + async def test_converges_at_max(self): + d = MaxIterationsDetector(max_iterations=3) + assert not await d.has_converged(0, "out", None, {}) + assert not await d.has_converged(2, "out", "prev", {}) + assert await d.has_converged(3, "out", "prev", {}) + + @pytest.mark.asyncio + async def test_converges_beyond_max(self): + d = MaxIterationsDetector(max_iterations=2) + assert await d.has_converged(5, "out", "prev", {}) + + +class TestApprovalDetector: + @pytest.mark.asyncio + async def test_detects_approved_string(self): + d = ApprovalDetector() + assert await d.has_converged(1, "This is approved.", None, {}) + + @pytest.mark.asyncio + async def test_detects_lgtm(self): + d = ApprovalDetector() + assert await d.has_converged(1, "LGTM, ship it!", None, {}) + + @pytest.mark.asyncio + async def test_detects_dict_approved_flag(self): + d = ApprovalDetector() + assert await d.has_converged(1, {"approved": True}, None, {}) + + @pytest.mark.asyncio + async def test_no_approval_in_text(self): + d = ApprovalDetector() + assert not await d.has_converged(1, "needs more work", None, {}) + + @pytest.mark.asyncio + async def test_dict_without_approved_key(self): + d = ApprovalDetector() + assert not await d.has_converged(1, {"output": "still working"}, None, {}) + + @pytest.mark.asyncio + async def test_non_string_non_dict(self): + d = ApprovalDetector() + assert not await d.has_converged(1, 42, None, {}) + + +class TestSimilarityDetector: + @pytest.mark.asyncio + async def test_no_convergence_on_first_iteration(self): + d = SimilarityDetector(threshold=0.95) + assert not await d.has_converged(0, "hello world", None, {}) + + @pytest.mark.asyncio + async def test_identical_outputs_converge(self): + d = SimilarityDetector(threshold=0.95) + assert await d.has_converged(1, "hello world", "hello world", {}) + + @pytest.mark.asyncio + async def test_different_outputs_dont_converge(self): + d = SimilarityDetector(threshold=0.95) + assert not await d.has_converged( + 1, "completely different text", "hello world", {} + ) + + @pytest.mark.asyncio + async def test_nearly_identical_converge(self): + d = SimilarityDetector(threshold=0.7) + text1 = "the quick brown fox jumps over the lazy dog" + text2 = "the quick brown fox leaps over the lazy dog" + # Jaccard: 7 shared words / 10 unique words = 0.7 + assert await d.has_converged(1, text1, text2, {}) + + @pytest.mark.asyncio + async def test_dict_output_extraction(self): + d = SimilarityDetector(threshold=0.95) + assert await d.has_converged( + 1, + {"output": "same text here"}, + {"output": "same text here"}, + {}, + ) + + @pytest.mark.asyncio + async def test_empty_strings(self): + d = SimilarityDetector(threshold=0.95) + # Both empty — no convergence (empty check returns False) + assert not await d.has_converged(1, "", "", {}) + + +class TestCreateDetector: + def test_creates_similarity(self): + d = create_detector("similarity", {"threshold": 0.9}) + assert isinstance(d, SimilarityDetector) + assert d.threshold == 0.9 + + def test_creates_approval(self): + d = create_detector("approval") + assert isinstance(d, ApprovalDetector) + + def test_creates_max_iterations(self): + d = create_detector("max_iterations", {"max_iterations": 10}) + assert isinstance(d, MaxIterationsDetector) + assert d.max_iterations == 10 + + def test_unknown_type_raises(self): + with pytest.raises(ValueError, match="Unknown"): + create_detector("nonexistent") diff --git a/tests/unit/test_topology.py b/tests/unit/test_topology.py new file mode 100644 index 0000000..a72d1aa --- /dev/null +++ b/tests/unit/test_topology.py @@ -0,0 +1,133 @@ +"""Unit tests for topology validation. + +Uses MagicMock objects to avoid importing the full ORM model chain +which triggers circular imports in isolation. +""" + +import sys +from types import ModuleType +from unittest.mock import MagicMock +from uuid import uuid4 + +import pytest + +# Stub out the heavy dependency chain before importing TopologyValidator +_base_mod = ModuleType("src.models.base") +_base_mod.BaseModel = type("BaseModel", (), {}) +sys.modules.setdefault("src.models.base", _base_mod) + +from src.exceptions import BadRequestException # noqa: E402 +from src.network.utils.topology import TopologyValidator # noqa: E402 + + +def _make_participant(pid=None, name="Agent"): + p = MagicMock() + p.id = pid or uuid4() + p.name = name + return p + + +def _make_network(topology="mesh"): + n = MagicMock() + n.topology_type = topology + return n + + +class TestMeshTopology: + def test_allows_any_communication(self): + v = TopologyValidator() + a, b = _make_participant(name="A"), _make_participant(name="B") + network = _make_network("mesh") + # Should not raise + v.validate(network, a, b, [a, b]) + + def test_get_reachable_returns_all_others(self): + v = TopologyValidator() + a, b, c = ( + _make_participant(name="A"), + _make_participant(name="B"), + _make_participant(name="C"), + ) + network = _make_network("mesh") + reachable = v.get_reachable(network, a, [a, b, c]) + assert set(p.id for p in reachable) == {b.id, c.id} + + +class TestStarTopology: + def test_hub_can_send_to_anyone(self): + v = TopologyValidator() + hub = _make_participant(name="Hub") + spoke = _make_participant(name="Spoke") + network = _make_network("star") + # Hub is first participant — should not raise + v.validate(network, hub, spoke, [hub, spoke]) + + def test_spoke_cannot_initiate(self): + v = TopologyValidator() + hub = _make_participant(name="Hub") + spoke = _make_participant(name="Spoke") + network = _make_network("star") + with pytest.raises(BadRequestException, match="hub"): + v.validate(network, spoke, hub, [hub, spoke]) + + def test_get_reachable_for_hub(self): + v = TopologyValidator() + hub = _make_participant(name="Hub") + s1 = _make_participant(name="S1") + s2 = _make_participant(name="S2") + network = _make_network("star") + reachable = v.get_reachable(network, hub, [hub, s1, s2]) + assert set(p.id for p in reachable) == {s1.id, s2.id} + + def test_get_reachable_for_spoke(self): + v = TopologyValidator() + hub = _make_participant(name="Hub") + spoke = _make_participant(name="Spoke") + network = _make_network("star") + reachable = v.get_reachable(network, spoke, [hub, spoke]) + assert len(reachable) == 1 + assert reachable[0].id == hub.id + + +class TestRingTopology: + def test_allows_next_in_ring(self): + v = TopologyValidator() + a = _make_participant(name="A") + b = _make_participant(name="B") + c = _make_participant(name="C") + network = _make_network("ring") + # A -> B (next in order) + v.validate(network, a, b, [a, b, c]) + # B -> C (next in order) + v.validate(network, b, c, [a, b, c]) + # C -> A (wraps around) + v.validate(network, c, a, [a, b, c]) + + def test_rejects_skip_in_ring(self): + v = TopologyValidator() + a = _make_participant(name="A") + b = _make_participant(name="B") + c = _make_participant(name="C") + network = _make_network("ring") + # A -> C (skipping B) + with pytest.raises(BadRequestException, match="Ring topology"): + v.validate(network, a, c, [a, b, c]) + + def test_get_reachable_returns_next_only(self): + v = TopologyValidator() + a = _make_participant(name="A") + b = _make_participant(name="B") + c = _make_participant(name="C") + network = _make_network("ring") + reachable = v.get_reachable(network, a, [a, b, c]) + assert len(reachable) == 1 + assert reachable[0].id == b.id + + +class TestCustomTopology: + def test_allows_any_communication(self): + v = TopologyValidator() + a, b = _make_participant(name="A"), _make_participant(name="B") + network = _make_network("custom") + # Should not raise + v.validate(network, a, b, [a, b]) diff --git a/tests/unit/test_url_validator.py b/tests/unit/test_url_validator.py new file mode 100644 index 0000000..b1e9ee1 --- /dev/null +++ b/tests/unit/test_url_validator.py @@ -0,0 +1,73 @@ +"""Unit tests for SSRF-safe URL validation.""" + +from unittest.mock import patch + +import pytest + +from src.exceptions import BadRequestException +from src.network.utils.url_validator import validate_callback_url + + +class TestValidateCallbackUrl: + def test_valid_https_url(self): + """Uses a public IP to avoid DNS resolution issues in CI.""" + with patch("src.network.utils.url_validator._is_private_ip", return_value=False): + url = "https://example.com/callback" + assert validate_callback_url(url) == url + + def test_valid_http_in_development(self): + with patch("src.network.utils.url_validator.settings") as mock_settings, \ + patch("src.network.utils.url_validator._is_private_ip", return_value=False): + mock_settings.ENVIRONMENT = "development" + url = "http://example.com/callback" + assert validate_callback_url(url) == url + + def test_rejects_http_in_production(self): + with patch("src.network.utils.url_validator.settings") as mock_settings: + mock_settings.ENVIRONMENT = "production" + with pytest.raises(BadRequestException, match="HTTPS"): + validate_callback_url("http://example.com/callback") + + def test_rejects_empty_url(self): + with pytest.raises(BadRequestException, match="empty"): + validate_callback_url("") + + def test_rejects_whitespace_only(self): + with pytest.raises(BadRequestException, match="empty"): + validate_callback_url(" ") + + def test_rejects_ftp_scheme(self): + with pytest.raises(BadRequestException, match="http or https"): + validate_callback_url("ftp://example.com/file") + + def test_rejects_javascript_scheme(self): + with pytest.raises(BadRequestException, match="http or https"): + validate_callback_url("javascript:alert(1)") + + def test_rejects_no_hostname(self): + with pytest.raises(BadRequestException, match="hostname"): + validate_callback_url("https:///path") + + def test_rejects_localhost(self): + with pytest.raises(BadRequestException, match="private"): + validate_callback_url("https://127.0.0.1/callback") + + def test_rejects_private_10_range(self): + with pytest.raises(BadRequestException, match="private"): + validate_callback_url("https://10.0.0.1/callback") + + def test_rejects_private_172_range(self): + with pytest.raises(BadRequestException, match="private"): + validate_callback_url("https://172.16.0.1/callback") + + def test_rejects_private_192_range(self): + with pytest.raises(BadRequestException, match="private"): + validate_callback_url("https://192.168.1.1/callback") + + def test_rejects_link_local(self): + with pytest.raises(BadRequestException, match="private"): + validate_callback_url("https://169.254.169.254/latest/meta-data/") + + def test_rejects_ipv6_loopback(self): + with pytest.raises(BadRequestException, match="private"): + validate_callback_url("https://[::1]/callback")