Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
831 changes: 831 additions & 0 deletions docs/E2E_TEST_SPEC.md

Large diffs are not rendered by default.

711 changes: 711 additions & 0 deletions docs/INTEGRATION_GUIDE.md

Large diffs are not rendered by default.

48 changes: 32 additions & 16 deletions docs/NETWORKS.md
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ Every message exchanged within a network is recorded and accumulated. When Intun

### The `reply_url` Pattern

When Intuno delivers any communication to an external agent, the payload includes a `reply_url`:
When Intuno delivers any communication to an external agent, the payload includes a signed `reply_url`:

```json
{
Expand All @@ -53,17 +53,25 @@ When Intuno delivers any communication to an external agent, the payload include
},
"content": "Please review this draft...",
"context": [
{"sender": "User", "recipient": "Writer Agent", "channel": "message", "content": "...", "timestamp": 1711900000}
{"sender": "User", "recipient": "Writer Agent", "channel": "message", "content": "...", "message_id": "uuid", "timestamp": 1711900000}
],
"reply_url": "https://api.intuno.net/networks/{id}/participants/{pid}/callback",
"reply_url": "https://api.intuno.net/networks/{id}/participants/{pid}/callback?sig=abc123...&exp=1712000000",
"network_participants": [
{"participant_id": "uuid", "name": "Writer Agent"},
{"participant_id": "uuid", "name": "Reviewer Agent"}
]
}
```

The external agent can POST back to the `reply_url` to proactively send messages into the network. No authentication is required on the callback — the URL itself acts as a capability token.
The external agent can POST back to the `reply_url` to proactively send messages into the network. The callback URL is HMAC-SHA256 signed — the `sig` and `exp` query parameters are validated server-side. Signatures expire after 24 hours (configurable). The agent does not need to compute signatures; just POST to the URL as provided.

### Security

- **Callback authentication**: Reply URLs are HMAC-SHA256 signed (`src/network/utils/callback_auth.py`). Invalid or expired signatures are rejected with 403.
- **Ownership checks**: All channel operations verify the calling user owns the network. Authenticated users cannot send messages in networks they don't own.
- **SSRF protection**: Callback URLs are validated against private IP ranges (10.x, 172.16.x, 192.168.x, 127.x, 169.254.x, IPv6 loopback/ULA). HTTPS is required in production.
- **Content limits**: All message content is capped at 65,536 characters.
- **Topology enforcement**: Communication constraints (star hub-only, ring sequential) are enforced at the service layer — invalid sends return 400.

---

Expand Down Expand Up @@ -138,14 +146,20 @@ Networks support four topology types that constrain communication patterns:
3. Agent A sends message to Agent B
POST /networks/{id}/messages/send
→ Record in DB + Redis context
→ POST to Agent B's callback_url (with context + reply_url)
→ POST to Agent B's callback_url (with context + signed reply_url)
→ If delivery fails → enqueue for retry via DeliveryWorker (Redis Streams)

4. Agent B responds proactively
POST /networks/{id}/participants/{B}/callback
POST /networks/{id}/participants/{B}/callback?sig=...&exp=...
→ Verify HMAC signature (reject if invalid/expired)
→ Record in DB + Redis context
→ Forward to Agent A if targeted (with updated context)
```

### Delivery Worker

Failed webhook deliveries are automatically retried by a background worker (`src/network/utils/delivery_worker.py`) that consumes from a Redis Stream. Retries use exponential backoff (2, 4, 8 seconds) with a configurable maximum (default: 3 retries). The worker starts automatically with the application.

---

## Workflow Integration: Loops and Aggregation
Expand Down Expand Up @@ -207,27 +221,29 @@ src/network/
├── __init__.py
├── models/
│ ├── entities.py # CommunicationNetwork, NetworkParticipant, NetworkMessage
│ └── schemas.py # Pydantic request/response schemas
│ └── schemas.py # Pydantic request/response schemas (Literal types, content limits)
├── repositories/
│ └── networks.py # CRUD + context retrieval
│ └── networks.py # CRUD + context retrieval + get_inbox (unread/recipient-only)
├── services/
│ ├── networks.py # Network management + message recording
│ └── channels.py # Calls, messages, mailboxes, callbacks
│ ├── networks.py # Network management + SSRF-safe URL validation on join
│ └── channels.py # Calls, messages, mailboxes, callbacks + ownership checks + topology
├── routes/
│ ├── networks.py # Network + participant + context endpoints
│ ├── channels.py # Call/message/mailbox/inbox endpoints
│ └── callbacks.py # External agent callback receiver
│ └── callbacks.py # HMAC-authenticated callback receiver
├── utils/
│ ├── context_manager.py # Redis Streams context accumulator
│ ├── delivery_worker.py # Background message delivery (Redis Streams consumer)
│ ├── topology.py # Topology validation and routing
│ ├── callback_auth.py # HMAC-SHA256 signed reply_url generation and verification
│ ├── url_validator.py # SSRF-safe URL validation (rejects private IPs)
│ ├── context_manager.py # Redis Streams context accumulator (includes message_id)
│ ├── delivery_worker.py # Background retry worker (Redis Streams consumer, exponential backoff)
│ ├── topology.py # Topology validation (mesh, star, ring, custom)
│ ├── convergence.py # Convergence detectors for loops
│ └── aggregator.py # Fan-in aggregation strategies
└── a2a/
├── agent_card.py # A2A Agent Card generation
├── protocol.py # A2A ↔ Intuno format translation
├── discovery.py # Fetch + import external A2A agents
└── routes.py # A2A-compatible API endpoints
├── discovery.py # Fetch + import external A2A agents (with URL validation)
└── routes.py # A2A-compatible API endpoints (session-safe via Depends)
```

---
Expand Down
7 changes: 7 additions & 0 deletions src/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -180,9 +180,16 @@ async def lifespan(app: FastAPI):
await scheduler.start()
await event_consumer.start()

# Network delivery worker (retries failed webhook deliveries)
from src.network.utils.delivery_worker import DeliveryWorker
delivery_worker = DeliveryWorker(app.state.redis)
await delivery_worker.start(http_client=app.state.http_client)
app.state.delivery_worker = delivery_worker

yield

# Shutdown
await delivery_worker.stop()
await app.state.http_client.aclose()
await event_consumer.stop()
await scheduler.stop()
Expand Down
4 changes: 4 additions & 0 deletions src/network/a2a/discovery.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,11 @@ async def fetch_agent_card(self, base_url: str) -> Optional[dict[str, Any]]:
"""Fetch an A2A Agent Card from a remote URL.

Tries well-known paths if the URL doesn't point directly to a card.
Validates the URL to prevent SSRF attacks.
"""
from src.network.utils.url_validator import validate_callback_url
validate_callback_url(base_url)

client = self._http_client
owns_client = client is None
if owns_client:
Expand Down
53 changes: 29 additions & 24 deletions src/network/a2a/routes.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,16 +68,15 @@ async def a2a_task_send(
data: A2ATaskSendRequest,
request: Request,
current_user: User = Depends(get_current_user),
discovery_service: "A2ADiscoveryService" = Depends(get_discovery_service),
) -> JSONResponse:
"""A2A-compatible task send endpoint.

Receives an A2A task, translates it to an Intuno network message,
processes it, and returns the result in A2A format.
"""
from src.network.services.channels import ChannelService
from src.network.repositories.networks import NetworkRepository
from src.network.utils.context_manager import NetworkContextManager
from src.database import get_redis

# Safety check: reject if platform is in emergency halt
from src.services.safety import check_platform_halt
Expand All @@ -102,16 +101,16 @@ async def a2a_task_send(
# Convert A2A task to Intuno message format
intuno_msg = a2a_task_to_intuno_message(task_data)

# Process through the channel service
try:
redis = request.app.state.redis
repo = NetworkRepository(
session=(await request.app.state.db_session_factory()).__aenter__()
)
ctx_manager = NetworkContextManager(redis)
channel_service = ChannelService(repo=repo, context_manager=ctx_manager)
channel_service.set_http_client(request.app.state.http_client)
# Use the request-scoped session from the discovery service dependency
from src.network.repositories.networks import NetworkRepository

redis = request.app.state.redis
repo = NetworkRepository(session=discovery_service.registry_repository.session)
ctx_manager = NetworkContextManager(redis)
channel_service = ChannelService(repo=repo, context_manager=ctx_manager)
channel_service.set_http_client(request.app.state.http_client)

try:
channel_type = intuno_msg.get("channel_type", "message")

if channel_type == "call" and recipient_participant_id:
Expand All @@ -121,8 +120,8 @@ async def a2a_task_send(
recipient_participant_id=UUID(recipient_participant_id),
content=intuno_msg["content"],
metadata=intuno_msg.get("metadata"),
owner_id=current_user.id,
)
# Convert result back to A2A task format
a2a_result = {
"id": result.get("message_id"),
"status": {"state": "completed"},
Expand All @@ -141,6 +140,7 @@ async def a2a_task_send(
recipient_participant_id=UUID(recipient_participant_id),
content=intuno_msg["content"],
metadata=intuno_msg.get("metadata"),
owner_id=current_user.id,
)
a2a_result = intuno_message_to_a2a_task(
{
Expand All @@ -155,10 +155,10 @@ async def a2a_task_send(

return JSONResponse(build_a2a_json_rpc_response(a2a_result, data.id))

except Exception as exc:
except (ValueError, KeyError) as exc:
return JSONResponse(
build_a2a_json_rpc_error(-32603, str(exc), data.id),
status_code=500,
build_a2a_json_rpc_error(-32602, str(exc), data.id),
status_code=400,
)


Expand Down Expand Up @@ -195,14 +195,14 @@ async def import_a2a_agent(
data: A2AImportRequest,
request: Request,
current_user: User = Depends(get_current_user),
discovery_service: "A2ADiscoveryService" = Depends(get_discovery_service),
) -> JSONResponse:
"""Import an external A2A agent as a first-class Intuno agent.

Fetches the Agent Card from the given URL, creates a registry entry,
generates embeddings, and indexes in Qdrant. The agent becomes fully
discoverable and invocable — just like any natively registered agent.
"""
discovery_service = await _get_discovery_service(request)
discovery_service.set_http_client(request.app.state.http_client)

try:
Expand Down Expand Up @@ -230,9 +230,9 @@ async def import_a2a_agents_batch(
data: A2ABatchImportRequest,
request: Request,
current_user: User = Depends(get_current_user),
discovery_service: "A2ADiscoveryService" = Depends(get_discovery_service),
) -> JSONResponse:
"""Import multiple external A2A agents in one request."""
discovery_service = await _get_discovery_service(request)
discovery_service.set_http_client(request.app.state.http_client)

results = await discovery_service.import_multiple(data.urls, current_user.id)
Expand All @@ -244,9 +244,9 @@ async def refresh_a2a_agent(
agent_id: str,
request: Request,
current_user: User = Depends(get_current_user),
discovery_service: "A2ADiscoveryService" = Depends(get_discovery_service),
) -> JSONResponse:
"""Re-fetch the Agent Card and update the registry entry."""
discovery_service = await _get_discovery_service(request)
discovery_service.set_http_client(request.app.state.http_client)

agent = await discovery_service.registry_repository.get_agent_by_agent_id(agent_id)
Expand Down Expand Up @@ -277,9 +277,9 @@ async def fetch_agent_card_preview(
url: str,
request: Request,
current_user: User = Depends(get_current_user),
discovery_service: "A2ADiscoveryService" = Depends(get_discovery_service),
) -> JSONResponse:
"""Preview an A2A Agent Card without importing it."""
discovery_service = await _get_discovery_service(request)
discovery_service.set_http_client(request.app.state.http_client)

card = await discovery_service.fetch_agent_card(url)
Expand All @@ -291,14 +291,19 @@ async def fetch_agent_card_preview(
return JSONResponse({"success": True, "card": card})


async def _get_discovery_service(request: Request):
"""Helper to build a discovery service from request context."""
def get_discovery_service(
registry: RegistryRepository = Depends(),
) -> "A2ADiscoveryService":
"""FastAPI dependency that provides a properly-scoped discovery service.

Uses the request-scoped DB session from Depends() to avoid connection
leaks (fixes: previous version created AsyncSessionLocal() without
closing it).
"""
from src.network.a2a.discovery import A2ADiscoveryService
from src.database import AsyncSessionLocal
from src.utilities.embedding import EmbeddingService

session = AsyncSessionLocal()
return A2ADiscoveryService(
registry_repository=RegistryRepository(session=session),
registry_repository=registry,
embedding_service=EmbeddingService(),
)
52 changes: 44 additions & 8 deletions src/network/models/schemas.py
Original file line number Diff line number Diff line change
@@ -1,25 +1,37 @@
"""Pydantic request/response schemas for communication networks."""

from datetime import datetime
from typing import Any, Optional
from typing import Any, Literal, Optional
from uuid import UUID

from pydantic import BaseModel, ConfigDict, Field

# ── Shared type aliases ─────────────────────────────────────────────

TopologyLiteral = Literal["mesh", "star", "ring", "custom"]
ChannelLiteral = Literal["call", "message", "mailbox"]
NetworkStatusLiteral = Literal["active", "paused", "closed"]
ParticipantTypeLiteral = Literal["agent", "persona", "orchestrator"]
ParticipantStatusLiteral = Literal["active", "disconnected", "removed"]
MessageStatusLiteral = Literal["pending", "delivered", "read", "failed"]

# Maximum content size for messages (64 KB)
MAX_CONTENT_LENGTH = 65536


# ── Network schemas ──────────────────────────────────────────────────


class NetworkCreate(BaseModel):
name: str = Field(..., max_length=255)
topology_type: str = Field(default="mesh", description="mesh | star | ring | custom")
topology_type: TopologyLiteral = Field(default="mesh")
metadata: Optional[dict[str, Any]] = None


class NetworkUpdate(BaseModel):
name: Optional[str] = Field(default=None, max_length=255)
topology_type: Optional[str] = None
status: Optional[str] = None
topology_type: Optional[TopologyLiteral] = None
status: Optional[NetworkStatusLiteral] = None
metadata: Optional[dict[str, Any]] = None


Expand All @@ -41,7 +53,7 @@ class NetworkResponse(BaseModel):

class ParticipantJoin(BaseModel):
agent_id: Optional[UUID] = None
participant_type: str = Field(default="agent", description="agent | persona | orchestrator")
participant_type: ParticipantTypeLiteral = Field(default="agent")
name: str = Field(..., max_length=255)
callback_url: Optional[str] = None
polling_enabled: bool = False
Expand All @@ -52,7 +64,7 @@ class ParticipantUpdate(BaseModel):
callback_url: Optional[str] = None
polling_enabled: Optional[bool] = None
capabilities: Optional[dict[str, Any]] = None
status: Optional[str] = None
status: Optional[ParticipantStatusLiteral] = None


class ParticipantResponse(BaseModel):
Expand All @@ -76,8 +88,8 @@ class ParticipantResponse(BaseModel):

class NetworkMessageCreate(BaseModel):
recipient_participant_id: Optional[UUID] = None
channel_type: str = Field(..., description="call | message | mailbox")
content: str
channel_type: ChannelLiteral = Field(...)
content: str = Field(..., max_length=MAX_CONTENT_LENGTH)
metadata: Optional[dict[str, Any]] = None
in_reply_to_id: Optional[UUID] = None

Expand All @@ -98,6 +110,29 @@ class NetworkMessageResponse(BaseModel):
updated_at: datetime


# ── Channel request/response (shared by call, message, mailbox) ─────


class ChannelRequest(BaseModel):
"""Shared request schema for all channel operations."""
sender_participant_id: UUID
recipient_participant_id: UUID
content: str = Field(..., max_length=MAX_CONTENT_LENGTH)
metadata: Optional[dict[str, Any]] = None


class CallResponse(BaseModel):
"""Response from a synchronous call."""
success: bool
message_id: str
response: Any


class AckResponse(BaseModel):
"""Response from message acknowledgment."""
acknowledged: int


# ── Context snapshot ─────────────────────────────────────────────────


Expand All @@ -106,6 +141,7 @@ class ContextEntry(BaseModel):
recipient: Optional[str] = None
channel: str
content: str
message_id: Optional[str] = None
timestamp: datetime


Expand Down
Loading
Loading