Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
76 changes: 66 additions & 10 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ AgentRegistry is the connective tissue. An agent registers itself when it starts
│ AgentRegistry.Api │
│ ASP.NET Core 10 · Minimal APIs · Scalar UI · OpenTelemetry │
│ Auth: API key (Admin/Agent scopes) + JWT Bearer │
│ Protocol adapters: A2A · MCP
│ Protocol adapters: A2A · MCP · ACP · Queued A2A
└────────────┬───────────────────────┬────────────────────────────┘
│ │
┌────────────▼───────┐ ┌───────────▼────────────────────────────┐
Expand Down Expand Up @@ -60,6 +60,7 @@ Detailed design rationale for each adapter is in [`/docs`](docs/):
- [A2A adapter design](docs/protocol-a2a.md)
- [MCP adapter design](docs/protocol-mcp.md)
- [ACP adapter design](docs/protocol-acp.md)
- [Queued A2A adapter design](docs/protocol-queued-a2a.md)

### A2A (Agent-to-Agent)

Expand Down Expand Up @@ -96,6 +97,18 @@ Targets [ACP spec 0.2.0](https://github.com/i-am-bee/acp) (IBM Research / BeeAI)

The manifest carries MIME-typed content types, JSON Schema for input/output/config/thread state, performance status metrics, and rich metadata (framework, natural languages, license, author). All fields round-trip through `Endpoint.ProtocolMetadata`. Agent names are normalised to RFC 1123 DNS-label format on manifest generation.

### Queued A2A (A2A over async message brokers)

Agents that communicate via **RabbitMQ** or **Azure Service Bus** rather than HTTP. The A2A wire protocol (task request / status update / result message shapes) is unchanged — only the transport differs. Callers publish A2A task messages to a named topic on the broker and receive responses asynchronously, enabling KEDA-scaled workers, long-running tasks, and decoupled agent pipelines.

- `GET /a2a/async/agents/{id}` — queued A2A agent card with full broker connection details (public)
- `GET /a2a/async/agents` — filtered list of queued agents (public)
- `POST /a2a/async/agents` — register by submitting a queued agent card (Agent or Admin)

The `queueEndpoint` object in each card carries `technology` (`"rabbitmq"` or `"azure-service-bus"`), `host`, `port`, `virtualHost`, `exchange`, `taskTopic` (where callers publish), and `responseTopic` (where callers listen for replies). For Azure Service Bus, `namespace` and `entityPath` replace the AMQP-specific fields.

Queued agents also appear in `GET /discover/agents?protocol=A2A` alongside HTTP A2A agents since they share the same protocol type. See [Queued A2A adapter design](docs/protocol-queued-a2a.md) for the full design rationale.

### Generic (protocol-agnostic)

All protocols can also be registered and discovered through the generic API, which returns the registry's own domain model rather than protocol-native card formats.
Expand Down Expand Up @@ -230,7 +243,7 @@ The registry accepts two authentication methods, selected by header:
- A `registry_scope` claim with value `Admin` or `Agent`
- A `roles` claim with value `registry.admin` or `registry.agent`

Discovery, the MCP server endpoint, and protocol card endpoints (`/discover/agents`, `/mcp`, `/a2a/agents/*`, `/mcp/servers/*`) are always public — no auth required.
Discovery, the MCP server endpoint, and protocol card endpoints (`/discover/agents`, `/mcp`, `/a2a/agents/*`, `/a2a/async/agents/*`, `/mcp/servers/*`, `/acp/agents/*`) are always public — no auth required.

## API overview

Expand Down Expand Up @@ -273,6 +286,14 @@ Discovery, the MCP server endpoint, and protocol card endpoints (`/discover/agen
| `GET` | `/acp/agents` | Public | Filtered list of ACP agent manifests |
| `POST` | `/acp/agents` | Agent or Admin | Register by submitting an ACP agent manifest |

### Queued A2A protocol

| Method | Path | Auth | Description |
|---|---|---|---|
| `GET` | `/a2a/async/agents/{id}` | Public | Queued A2A card with broker connection details |
| `GET` | `/a2a/async/agents` | Public | Filtered list of queued A2A agent cards |
| `POST` | `/a2a/async/agents` | Agent or Admin | Register an agent with queue endpoint details |

### Key management and system

| Method | Path | Auth | Description |
Expand Down Expand Up @@ -354,20 +375,53 @@ The two services are non-conflicting — if a config-defined agent also self-reg

## Queue-backed agents

Agents using AMQP or Azure Service Bus don't need to be running when discovered. The registry stores the queue address as the endpoint:
Agents using AMQP or Azure Service Bus don't need to be running when discovered. The registry stores the queue address as the endpoint. There are two ways to register a queue-backed agent:

### Queued A2A (recommended for A2A agents over a broker)

Use `POST /a2a/async/agents` with the full `queueEndpoint` connection details:

```json
{
"name": "ResearchAgent",
"description": "On-demand research agent",
"version": "1.0",
"skills": [{ "id": "research", "name": "Research", "description": "Researches a topic", "tags": ["search"] }],
"defaultInputModes": ["application/json"],
"defaultOutputModes": ["application/json"],
"queueEndpoint": {
"technology": "rabbitmq",
"host": "rabbitmq.example.com",
"port": 5672,
"virtualHost": "/",
"exchange": "rockbot",
"taskTopic": "agent.task.ResearchAgent",
"responseTopic": "agent.response.{callerName}"
}
}
```

Clients discover the agent via `GET /a2a/async/agents` and receive the full broker connection details needed to publish A2A task messages directly.

### Generic registration

Use `POST /agents` with explicit `transport` and `protocol` fields. This works for any protocol/transport combination but returns the registry's internal model rather than a protocol-native card:

```json
{
"name": "async-processor",
"transport": "AzureServiceBus",
"protocol": "A2A",
"address": "agents/summarizer/requests",
"livenessModel": "Ephemeral",
"ttlSeconds": 60
"endpoints": [{
"name": "queue",
"transport": "AzureServiceBus",
"protocol": "A2A",
"address": "agents/summarizer/requests",
"livenessModel": "Ephemeral",
"ttlSeconds": 60
}]
}
```

A KEDA-scaled worker registers on startup, processes jobs, and the TTL expires naturally when the scaling group idles. Consumers route work to the queue address — whether the worker is currently running or not is KEDA's concern.
In both cases, a KEDA-scaled worker registers on startup, processes jobs, and the TTL expires naturally when the scaling group idles. Consumers route work to the queue address — whether the worker is currently running or not is KEDA's concern. See [Queued A2A adapter design](docs/protocol-queued-a2a.md) for the full pattern including liveness and round-tripping.

## Configuration reference

Expand Down Expand Up @@ -429,9 +483,10 @@ src/
AgentRegistry.Infrastructure/ EF Core (PostgreSQL), Redis, SQL API key service
AgentRegistry.Api/ ASP.NET Core 10 minimal API, auth, Scalar
Protocols/
A2A/ A2A v1.0 RC agent card adapter
A2A/ A2A v1.0 RC agent card adapter (HTTP)
MCP/ MCP 2025-11-25 server card adapter (Streamable HTTP)
ACP/ ACP 0.2.0 agent manifest adapter
QueuedA2A/ A2A over async message brokers (RabbitMQ, Azure Service Bus)
tests/
AgentRegistry.Domain.Tests/ Domain unit tests
AgentRegistry.Application.Tests/ Service tests using Rocks source-gen mocks
Expand All @@ -440,6 +495,7 @@ tests/
A2A/ A2A endpoint tests
MCP/ MCP endpoint tests
ACP/ ACP endpoint tests
QueuedA2A/ Queued A2A endpoint tests
k8s/
redis.yaml Redis StatefulSet + Service
agentregistry/
Expand Down
200 changes: 200 additions & 0 deletions docs/async-messaging-plan.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,200 @@
# Async Messaging Support Plan

## Background

The A2A, MCP, and ACP protocol adapters in this registry assume synchronous HTTP transport.
The rockbot project demonstrates a different pattern: agents communicate via A2A **over async
message brokers** (RabbitMQ, Azure Service Bus, etc.), using fire-and-forget task publication
with a correlated async response channel.

This plan adds first-class support for registering and discovering agents that are reachable
via a message queue rather than an HTTP endpoint.

## Design Decisions

### Reuse `ProtocolType.A2A`, add a new adapter

The wire protocol between agents (task request/status/result message shapes) is still A2A.
What changes is the **transport**: instead of `POST /` over HTTP, callers publish a JSON
message to an exchange/topic on a broker.

Rather than shoehorn queue connection details into the existing HTTP-oriented A2A adapter,
we add a dedicated **`Protocols/QueuedA2A/`** adapter with its own models and API surface.

### Technology-agnostic model

Supported queue technologies (first release):

| Technology | `TransportType` | Key connection fields |
|------------------|--------------------------|----------------------------------------------------|
| RabbitMQ / AMQP | `Amqp` | host, port, virtualHost, exchange, taskRoutingKey |
| Azure Service Bus| `AzureServiceBus` | namespace (FQDN), entityPath |

No domain enum changes are required; `TransportType.Amqp` and
`TransportType.AzureServiceBus` already exist.

### Connection details stored in `ProtocolMetadata`

All broker-specific fields are stored as JSONB in `Endpoint.ProtocolMetadata`, following the
same round-trip pattern used by the other adapters. The `Endpoint.Address` field carries the
primary queue/topic name (the address a client publishes task messages to).

### New API surface

```
POST /a2a/async/agents Register an agent with queue endpoint details
GET /a2a/async/agents List/discover queued agents (filterable)
GET /a2a/async/agents/{id} Retrieve a specific queued agent card
```

Discovery endpoints are public. Registration requires `AgentOrAdmin` auth.

---

## Queued Agent Card Model

```jsonc
{
// Standard A2A fields
"id": "3fa85f64-5717-4562-b3fc-2c963f66afa6",
"name": "ResearchAgent",
"description": "On-demand research agent using web search",
"version": "1.0",
"skills": [
{ "id": "research", "name": "Research", "description": "...", "tags": ["search"] }
],
"defaultInputModes": ["application/json"],
"defaultOutputModes": ["application/json"],

// Queue endpoint (required for registration, returned on discovery)
"queueEndpoint": {
"technology": "rabbitmq", // "rabbitmq" | "azure-service-bus"
"host": "rabbitmq.example.com", // broker host (omit for Azure SB — use namespace)
"port": 5672, // optional; defaults: AMQP=5672, AMQPS=5671
"virtualHost": "/", // AMQP virtual host (default "/")
"exchange": "rockbot", // AMQP exchange name (topic exchange)
"taskTopic": "agent.task.ResearchAgent", // routing key callers publish tasks to
"responseTopic": "agent.response.{callerName}", // pattern callers subscribe to for results
"namespace": null, // Azure SB namespace (e.g. "mybus.servicebus.windows.net")
"entityPath": null // Azure SB queue or topic path
},

// Liveness (set by registry on discovery)
"isLive": true
}
```

---

## File Changes

### New files

```
src/MarimerLLC.AgentRegistry.Api/Protocols/QueuedA2A/
Models/
QueuedAgentCard.cs # top-level card (mirrors A2A card + queueEndpoint)
QueueEndpoint.cs # queue connection details
QueuedA2AMapper.cs # Domain ↔ QueuedAgentCard, ProtocolMetadata storage
QueuedA2AEndpoints.cs # minimal API route registrations

tests/AgentRegistry.Api.Tests/Protocols/QueuedA2A/
QueuedA2AEndpointTests.cs # integration tests (WebApplicationFactory)
```

### Modified files

| File | Change |
|------|--------|
| `src/MarimerLLC.AgentRegistry.Api/Program.cs` | Call `app.MapQueuedA2AEndpoints()` |

---

## Implementation Tasks

### Task 1 — Models (`QueuedAgentCard`, `QueueEndpoint`)

Create `Protocols/QueuedA2A/Models/`:

- **`QueueEndpoint`** record:
- `Technology` (string — "rabbitmq" | "azure-service-bus")
- `Host` (string?)
- `Port` (int?)
- `VirtualHost` (string?) — AMQP
- `Exchange` (string?) — AMQP
- `TaskTopic` (string) — routing key / entity path clients publish to
- `ResponseTopic` (string?) — pattern for caller response subscription
- `Namespace` (string?) — Azure SB FQDN
- `EntityPath` (string?) — Azure SB queue or topic

- **`QueuedAgentCard`** record (mirrors A2A card fields relevant to queued agents):
- `Id`, `Name`, `Description`, `Version`
- `Skills` (list of `AgentSkill` — reuse from A2A models)
- `DefaultInputModes`, `DefaultOutputModes`
- `QueueEndpoint` (required on registration; present on discovery)
- `IsLive` (bool, set by registry on discovery)

### Task 2 — Mapper (`QueuedA2AMapper`)

```
FromCard(QueuedAgentCard) → MappedRegistration
- capabilities: each skill → RegisterCapabilityRequest
- endpoint:
Transport = technology == "azure-service-bus" ? AzureServiceBus : Amqp
Protocol = ProtocolType.A2A
Address = card.QueueEndpoint.TaskTopic
LivenessModel = Persistent
HeartbeatInterval = 30s
ProtocolMetadata = JSON of StoredQueuedA2AMetadata (all card + queueEndpoint fields)

ToCard(AgentWithLiveness) → QueuedAgentCard?
- filter endpoints: Protocol == A2A && Transport != Http
- deserialize ProtocolMetadata → StoredQueuedA2AMetadata
- reconstruct QueuedAgentCard faithfully; set IsLive from LiveEndpointIds
```

### Task 3 — Endpoints (`QueuedA2AEndpoints`)

Follow the ACP endpoints pattern closely:

```csharp
POST /a2a/async/agents (AgentOrAdmin auth)
GET /a2a/async/agents (public, paginated; ?capability=&tags=&liveOnly=&page=&pageSize=)
GET /a2a/async/agents/{id} (public)
```

Every route decorated with `.WithTags("QueuedA2A")`, `.WithSummary()`, `.WithDescription()`,
`.Produces<T>()`, `.ProducesProblem()`.

Discovery filter: `Protocol = A2A`, `Transport = Amqp | AzureServiceBus` (or filter in the
mapper by checking technology stored in metadata).

### Task 4 — Wire up in `Program.cs`

Add `app.MapQueuedA2AEndpoints();` after the other `Map*Endpoints()` calls.

### Task 5 — Integration tests

`tests/AgentRegistry.Api.Tests/Protocols/QueuedA2A/QueuedA2AEndpointTests.cs`:

- `Register_WithRabbitMQ_Returns201_WithId`
- `Register_WithAzureServiceBus_Returns201_WithId`
- `Register_MissingTaskTopic_Returns400`
- `Register_Unauthenticated_Returns401`
- `GetCard_ReturnsRegisteredCard_WithQueueEndpoint`
- `GetCard_UnknownId_Returns404`
- `ListCards_FiltersToQueuedAgents`
- `ListCards_LiveOnly_ExcludesDeadEndpoints`
- `RoundTrip_AllFields_PreservedOnDiscovery`

---

## Open Questions / Out of Scope (v1)

- **Kafka** transport — `TransportType` would need a new `Kafka` value; deferred.
- **TLS/auth parameters** — connection strings with credentials are sensitive; v1 omits them
from the card (clients are expected to have credentials out of band, as they do in rockbot).
- **Queued endpoint heartbeat/renew** — the standard `POST /agents/{id}/endpoints/{eid}/heartbeat`
route works unchanged; no new liveness plumbing needed.
- **Discovery endpoint that returns *both* HTTP and queued endpoints** — callers use the
existing generic `/agents` discovery or query protocol-specific endpoints; no aggregated view needed in v1.
Loading
Loading