From 055448304f60b29a7804c2a144a0317e1a4e56f3 Mon Sep 17 00:00:00 2001 From: Rockford Lhotka Date: Mon, 2 Mar 2026 21:49:39 -0600 Subject: [PATCH 1/2] Add QueuedA2A protocol adapter for async message-queue agents Agents that communicate via RabbitMQ or Azure Service Bus (using the A2A wire protocol over async messaging) can now register with the registry and be discovered by clients. New API surface: POST /a2a/async/agents - register an agent with queue endpoint details GET /a2a/async/agents - list/discover queued agents (paginated) GET /a2a/async/agents/{id} - retrieve a specific queued agent card The QueuedAgentCard extends the A2A card with a queueEndpoint object that carries broker connection details (technology, host, port, virtualHost, exchange, taskTopic, responseTopic for RabbitMQ; namespace and entityPath for Azure Service Bus). All fields round-trip faithfully through Endpoint.ProtocolMetadata (JSONB). ProtocolType stays A2A; TransportType .Amqp and .AzureServiceBus distinguish the broker technologies. 16 integration tests cover RabbitMQ and Azure Service Bus registration, discovery, 400/401/404 error cases, and full field round-tripping. Co-Authored-By: Claude Sonnet 4.6 --- docs/async-messaging-plan.md | 200 +++++++++++ src/MarimerLLC.AgentRegistry.Api/Program.cs | 2 + .../QueuedA2A/Models/QueueEndpoint.cs | 74 ++++ .../QueuedA2A/Models/QueuedAgentCard.cs | 50 +++ .../Protocols/QueuedA2A/QueuedA2AEndpoints.cs | 147 ++++++++ .../Protocols/QueuedA2A/QueuedA2AMapper.cs | 151 ++++++++ .../QueuedA2A/QueuedA2AEndpointTests.cs | 331 ++++++++++++++++++ 7 files changed, 955 insertions(+) create mode 100644 docs/async-messaging-plan.md create mode 100644 src/MarimerLLC.AgentRegistry.Api/Protocols/QueuedA2A/Models/QueueEndpoint.cs create mode 100644 src/MarimerLLC.AgentRegistry.Api/Protocols/QueuedA2A/Models/QueuedAgentCard.cs create mode 100644 src/MarimerLLC.AgentRegistry.Api/Protocols/QueuedA2A/QueuedA2AEndpoints.cs create mode 100644 src/MarimerLLC.AgentRegistry.Api/Protocols/QueuedA2A/QueuedA2AMapper.cs create mode 100644 tests/AgentRegistry.Api.Tests/Protocols/QueuedA2A/QueuedA2AEndpointTests.cs diff --git a/docs/async-messaging-plan.md b/docs/async-messaging-plan.md new file mode 100644 index 0000000..2d0462d --- /dev/null +++ b/docs/async-messaging-plan.md @@ -0,0 +1,200 @@ +# Async Messaging Support Plan + +## Background + +The A2A, MCP, and ACP protocol adapters in this registry assume synchronous HTTP transport. +The rockbot project demonstrates a different pattern: agents communicate via A2A **over async +message brokers** (RabbitMQ, Azure Service Bus, etc.), using fire-and-forget task publication +with a correlated async response channel. + +This plan adds first-class support for registering and discovering agents that are reachable +via a message queue rather than an HTTP endpoint. + +## Design Decisions + +### Reuse `ProtocolType.A2A`, add a new adapter + +The wire protocol between agents (task request/status/result message shapes) is still A2A. +What changes is the **transport**: instead of `POST /` over HTTP, callers publish a JSON +message to an exchange/topic on a broker. + +Rather than shoehorn queue connection details into the existing HTTP-oriented A2A adapter, +we add a dedicated **`Protocols/QueuedA2A/`** adapter with its own models and API surface. + +### Technology-agnostic model + +Supported queue technologies (first release): + +| Technology | `TransportType` | Key connection fields | +|------------------|--------------------------|----------------------------------------------------| +| RabbitMQ / AMQP | `Amqp` | host, port, virtualHost, exchange, taskRoutingKey | +| Azure Service Bus| `AzureServiceBus` | namespace (FQDN), entityPath | + +No domain enum changes are required; `TransportType.Amqp` and +`TransportType.AzureServiceBus` already exist. + +### Connection details stored in `ProtocolMetadata` + +All broker-specific fields are stored as JSONB in `Endpoint.ProtocolMetadata`, following the +same round-trip pattern used by the other adapters. The `Endpoint.Address` field carries the +primary queue/topic name (the address a client publishes task messages to). + +### New API surface + +``` +POST /a2a/async/agents Register an agent with queue endpoint details +GET /a2a/async/agents List/discover queued agents (filterable) +GET /a2a/async/agents/{id} Retrieve a specific queued agent card +``` + +Discovery endpoints are public. Registration requires `AgentOrAdmin` auth. + +--- + +## Queued Agent Card Model + +```jsonc +{ + // Standard A2A fields + "id": "3fa85f64-5717-4562-b3fc-2c963f66afa6", + "name": "ResearchAgent", + "description": "On-demand research agent using web search", + "version": "1.0", + "skills": [ + { "id": "research", "name": "Research", "description": "...", "tags": ["search"] } + ], + "defaultInputModes": ["application/json"], + "defaultOutputModes": ["application/json"], + + // Queue endpoint (required for registration, returned on discovery) + "queueEndpoint": { + "technology": "rabbitmq", // "rabbitmq" | "azure-service-bus" + "host": "rabbitmq.example.com", // broker host (omit for Azure SB — use namespace) + "port": 5672, // optional; defaults: AMQP=5672, AMQPS=5671 + "virtualHost": "/", // AMQP virtual host (default "/") + "exchange": "rockbot", // AMQP exchange name (topic exchange) + "taskTopic": "agent.task.ResearchAgent", // routing key callers publish tasks to + "responseTopic": "agent.response.{callerName}", // pattern callers subscribe to for results + "namespace": null, // Azure SB namespace (e.g. "mybus.servicebus.windows.net") + "entityPath": null // Azure SB queue or topic path + }, + + // Liveness (set by registry on discovery) + "isLive": true +} +``` + +--- + +## File Changes + +### New files + +``` +src/MarimerLLC.AgentRegistry.Api/Protocols/QueuedA2A/ + Models/ + QueuedAgentCard.cs # top-level card (mirrors A2A card + queueEndpoint) + QueueEndpoint.cs # queue connection details + QueuedA2AMapper.cs # Domain ↔ QueuedAgentCard, ProtocolMetadata storage + QueuedA2AEndpoints.cs # minimal API route registrations + +tests/AgentRegistry.Api.Tests/Protocols/QueuedA2A/ + QueuedA2AEndpointTests.cs # integration tests (WebApplicationFactory) +``` + +### Modified files + +| File | Change | +|------|--------| +| `src/MarimerLLC.AgentRegistry.Api/Program.cs` | Call `app.MapQueuedA2AEndpoints()` | + +--- + +## Implementation Tasks + +### Task 1 — Models (`QueuedAgentCard`, `QueueEndpoint`) + +Create `Protocols/QueuedA2A/Models/`: + +- **`QueueEndpoint`** record: + - `Technology` (string — "rabbitmq" | "azure-service-bus") + - `Host` (string?) + - `Port` (int?) + - `VirtualHost` (string?) — AMQP + - `Exchange` (string?) — AMQP + - `TaskTopic` (string) — routing key / entity path clients publish to + - `ResponseTopic` (string?) — pattern for caller response subscription + - `Namespace` (string?) — Azure SB FQDN + - `EntityPath` (string?) — Azure SB queue or topic + +- **`QueuedAgentCard`** record (mirrors A2A card fields relevant to queued agents): + - `Id`, `Name`, `Description`, `Version` + - `Skills` (list of `AgentSkill` — reuse from A2A models) + - `DefaultInputModes`, `DefaultOutputModes` + - `QueueEndpoint` (required on registration; present on discovery) + - `IsLive` (bool, set by registry on discovery) + +### Task 2 — Mapper (`QueuedA2AMapper`) + +``` +FromCard(QueuedAgentCard) → MappedRegistration + - capabilities: each skill → RegisterCapabilityRequest + - endpoint: + Transport = technology == "azure-service-bus" ? AzureServiceBus : Amqp + Protocol = ProtocolType.A2A + Address = card.QueueEndpoint.TaskTopic + LivenessModel = Persistent + HeartbeatInterval = 30s + ProtocolMetadata = JSON of StoredQueuedA2AMetadata (all card + queueEndpoint fields) + +ToCard(AgentWithLiveness) → QueuedAgentCard? + - filter endpoints: Protocol == A2A && Transport != Http + - deserialize ProtocolMetadata → StoredQueuedA2AMetadata + - reconstruct QueuedAgentCard faithfully; set IsLive from LiveEndpointIds +``` + +### Task 3 — Endpoints (`QueuedA2AEndpoints`) + +Follow the ACP endpoints pattern closely: + +```csharp +POST /a2a/async/agents (AgentOrAdmin auth) +GET /a2a/async/agents (public, paginated; ?capability=&tags=&liveOnly=&page=&pageSize=) +GET /a2a/async/agents/{id} (public) +``` + +Every route decorated with `.WithTags("QueuedA2A")`, `.WithSummary()`, `.WithDescription()`, +`.Produces()`, `.ProducesProblem()`. + +Discovery filter: `Protocol = A2A`, `Transport = Amqp | AzureServiceBus` (or filter in the +mapper by checking technology stored in metadata). + +### Task 4 — Wire up in `Program.cs` + +Add `app.MapQueuedA2AEndpoints();` after the other `Map*Endpoints()` calls. + +### Task 5 — Integration tests + +`tests/AgentRegistry.Api.Tests/Protocols/QueuedA2A/QueuedA2AEndpointTests.cs`: + +- `Register_WithRabbitMQ_Returns201_WithId` +- `Register_WithAzureServiceBus_Returns201_WithId` +- `Register_MissingTaskTopic_Returns400` +- `Register_Unauthenticated_Returns401` +- `GetCard_ReturnsRegisteredCard_WithQueueEndpoint` +- `GetCard_UnknownId_Returns404` +- `ListCards_FiltersToQueuedAgents` +- `ListCards_LiveOnly_ExcludesDeadEndpoints` +- `RoundTrip_AllFields_PreservedOnDiscovery` + +--- + +## Open Questions / Out of Scope (v1) + +- **Kafka** transport — `TransportType` would need a new `Kafka` value; deferred. +- **TLS/auth parameters** — connection strings with credentials are sensitive; v1 omits them + from the card (clients are expected to have credentials out of band, as they do in rockbot). +- **Queued endpoint heartbeat/renew** — the standard `POST /agents/{id}/endpoints/{eid}/heartbeat` + route works unchanged; no new liveness plumbing needed. +- **Discovery endpoint that returns *both* HTTP and queued endpoints** — callers use the + existing generic `/agents` discovery or query protocol-specific endpoints; no aggregated view needed in v1. diff --git a/src/MarimerLLC.AgentRegistry.Api/Program.cs b/src/MarimerLLC.AgentRegistry.Api/Program.cs index 927a8fa..bec6cae 100644 --- a/src/MarimerLLC.AgentRegistry.Api/Program.cs +++ b/src/MarimerLLC.AgentRegistry.Api/Program.cs @@ -4,6 +4,7 @@ using MarimerLLC.AgentRegistry.Api.Protocols.A2A; using MarimerLLC.AgentRegistry.Api.Protocols.ACP; using MarimerLLC.AgentRegistry.Api.Protocols.MCP; +using MarimerLLC.AgentRegistry.Api.Protocols.QueuedA2A; using ModelContextProtocol.Protocol; using ModelContextProtocol.Server; using MarimerLLC.AgentRegistry.Application.Agents; @@ -162,6 +163,7 @@ app.MapA2AEndpoints(); app.MapMcpEndpoints(); app.MapAcpEndpoints(); +app.MapQueuedA2AEndpoints(); if (builder.Configuration.GetValue("Database:AutoMigrate")) await app.Services.MigrateAsync(); diff --git a/src/MarimerLLC.AgentRegistry.Api/Protocols/QueuedA2A/Models/QueueEndpoint.cs b/src/MarimerLLC.AgentRegistry.Api/Protocols/QueuedA2A/Models/QueueEndpoint.cs new file mode 100644 index 0000000..56e6230 --- /dev/null +++ b/src/MarimerLLC.AgentRegistry.Api/Protocols/QueuedA2A/Models/QueueEndpoint.cs @@ -0,0 +1,74 @@ +using System.Text.Json.Serialization; + +namespace MarimerLLC.AgentRegistry.Api.Protocols.QueuedA2A.Models; + +/// +/// Connection details for an agent reachable via an async message broker. +/// Clients publish A2A task request messages to and +/// subscribe on for results. +/// +public record QueueEndpoint +{ + /// + /// Identifies the broker technology. Supported values: rabbitmq, azure-service-bus. + /// + [JsonPropertyName("technology")] + public required string Technology { get; init; } + + // ── AMQP / RabbitMQ fields ──────────────────────────────────────────────── + + /// Broker hostname or IP address (e.g. rabbitmq.example.com). Used for AMQP brokers. + [JsonPropertyName("host")] + [JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingNull)] + public string? Host { get; init; } + + /// Broker port. Defaults: AMQP=5672, AMQPS=5671. Omit to use the technology default. + [JsonPropertyName("port")] + [JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingNull)] + public int? Port { get; init; } + + /// AMQP virtual host (e.g. /). Only relevant for RabbitMQ / AMQP. + [JsonPropertyName("virtualHost")] + [JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingNull)] + public string? VirtualHost { get; init; } + + /// AMQP exchange name (e.g. rockbot). Only relevant for RabbitMQ / AMQP. + [JsonPropertyName("exchange")] + [JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingNull)] + public string? Exchange { get; init; } + + // ── Routing ─────────────────────────────────────────────────────────────── + + /// + /// Routing key or topic path that callers publish A2A task requests to. + /// For RabbitMQ this is an AMQP routing key (e.g. agent.task.ResearchAgent). + /// For Azure Service Bus this is the queue or topic path (e.g. agent-tasks). + /// This is also stored in . + /// + [JsonPropertyName("taskTopic")] + public required string TaskTopic { get; init; } + + /// + /// Pattern that callers should subscribe to in order to receive responses from this agent. + /// May include a placeholder such as {callerName} that the caller substitutes with its + /// own identity (e.g. agent.response.{callerName}). + /// + [JsonPropertyName("responseTopic")] + [JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingNull)] + public string? ResponseTopic { get; init; } + + // ── Azure Service Bus fields ────────────────────────────────────────────── + + /// + /// Fully-qualified Azure Service Bus namespace hostname + /// (e.g. mybus.servicebus.windows.net). Only relevant for Azure Service Bus. + /// + [JsonPropertyName("namespace")] + [JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingNull)] + public string? Namespace { get; init; } + + /// Azure Service Bus queue or topic path. Only relevant for Azure Service Bus. + [JsonPropertyName("entityPath")] + [JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingNull)] + public string? EntityPath { get; init; } +} diff --git a/src/MarimerLLC.AgentRegistry.Api/Protocols/QueuedA2A/Models/QueuedAgentCard.cs b/src/MarimerLLC.AgentRegistry.Api/Protocols/QueuedA2A/Models/QueuedAgentCard.cs new file mode 100644 index 0000000..562b35c --- /dev/null +++ b/src/MarimerLLC.AgentRegistry.Api/Protocols/QueuedA2A/Models/QueuedAgentCard.cs @@ -0,0 +1,50 @@ +using System.Text.Json.Serialization; +using MarimerLLC.AgentRegistry.Api.Protocols.A2A.Models; + +namespace MarimerLLC.AgentRegistry.Api.Protocols.QueuedA2A.Models; + +/// +/// An A2A agent card for agents that communicate via async message queues rather than HTTP. +/// The A2A wire protocol (task request / status update / result message shapes) is unchanged; +/// only the transport differs — callers publish messages to +/// on the named broker instead of sending HTTP requests. +/// +public record QueuedAgentCard +{ + [JsonPropertyName("name")] + public required string Name { get; init; } + + [JsonPropertyName("description")] + public required string Description { get; init; } + + [JsonPropertyName("version")] + public required string Version { get; init; } + + /// Skills (capabilities) this agent declares, in A2A format. + [JsonPropertyName("skills")] + public required IReadOnlyList Skills { get; init; } + + [JsonPropertyName("defaultInputModes")] + public required IReadOnlyList DefaultInputModes { get; init; } + + [JsonPropertyName("defaultOutputModes")] + public required IReadOnlyList DefaultOutputModes { get; init; } + + /// + /// Queue / broker connection details required to send tasks to this agent. + /// Must be supplied on registration; always present on discovery responses. + /// + [JsonPropertyName("queueEndpoint")] + public required QueueEndpoint QueueEndpoint { get; init; } + + // ── Registry-added discovery annotations ────────────────────────────────── + + /// Registry-assigned agent ID. Populated on discovery responses; omit when registering. + [JsonPropertyName("id")] + [JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingNull)] + public string? Id { get; init; } + + /// Whether the agent's queue endpoint is currently considered live by the registry. + [JsonPropertyName("isLive")] + public bool IsLive { get; init; } +} diff --git a/src/MarimerLLC.AgentRegistry.Api/Protocols/QueuedA2A/QueuedA2AEndpoints.cs b/src/MarimerLLC.AgentRegistry.Api/Protocols/QueuedA2A/QueuedA2AEndpoints.cs new file mode 100644 index 0000000..359999d --- /dev/null +++ b/src/MarimerLLC.AgentRegistry.Api/Protocols/QueuedA2A/QueuedA2AEndpoints.cs @@ -0,0 +1,147 @@ +using System.Security.Claims; +using MarimerLLC.AgentRegistry.Api.Auth; +using MarimerLLC.AgentRegistry.Api.Protocols.QueuedA2A.Models; +using MarimerLLC.AgentRegistry.Application.Agents; +using MarimerLLC.AgentRegistry.Domain.Agents; + +namespace MarimerLLC.AgentRegistry.Api.Protocols.QueuedA2A; + +public static class QueuedA2AEndpoints +{ + public static IEndpointRouteBuilder MapQueuedA2AEndpoints(this IEndpointRouteBuilder app) + { + var group = app.MapGroup("/a2a/async").WithTags("QueuedA2A"); + + // Public discovery + group.MapGet("/agents/{id}", GetCard) + .WithName("QueuedA2AGetAgentCard") + .WithSummary("Get a queued A2A agent card") + .WithDescription( + "Returns the A2A agent card for an agent registered with a message-queue endpoint. " + + "The card includes the broker connection details (technology, host, exchange, topic) " + + "needed for a client to publish A2A task messages directly to the agent. " + + "Returns 404 if the agent exists but has no queued A2A endpoints.") + .Produces(StatusCodes.Status200OK) + .ProducesProblem(StatusCodes.Status400BadRequest) + .ProducesProblem(StatusCodes.Status404NotFound); + + group.MapGet("/agents", ListCards) + .WithName("QueuedA2AListAgents") + .WithSummary("List queued A2A agent cards") + .WithDescription( + "Returns a paginated list of A2A agent cards for agents with message-queue endpoints.\n\n" + + "**Filters**\n" + + "- `capability` — match agents that declare a capability with this exact name\n" + + "- `tags` — comma-separated list; agents must match all supplied tags\n" + + "- `liveOnly` — when `false`, includes agents with no live endpoints (default: `true`)\n" + + "- `page` / `pageSize` — 1-based page number and page size (max 100, default 20)") + .Produces(StatusCodes.Status200OK); + + // Authenticated registration + group.MapPost("/agents", RegisterViaCard) + .RequireAuthorization(RegistryPolicies.AgentOrAdmin) + .WithName("QueuedA2ARegisterAgent") + .WithSummary("Register a queued A2A agent") + .WithDescription( + "Registers an agent that communicates via an async message broker using the A2A protocol. " + + "Supply the A2A agent card along with the queue endpoint connection details. " + + "All card fields round-trip through protocol metadata for faithful reconstruction on discovery.") + .Produces(StatusCodes.Status201Created) + .ProducesProblem(StatusCodes.Status400BadRequest) + .ProducesProblem(StatusCodes.Status401Unauthorized); + + return app; + } + + private static async Task GetCard( + string id, + AgentService agentService, + CancellationToken ct) + { + if (!Guid.TryParse(id, out var guid)) + return Results.BadRequest("Invalid agent ID format."); + + var result = await agentService.GetByIdWithLivenessAsync(new AgentId(guid), ct); + if (result is null) return Results.NotFound(); + + var card = QueuedA2AMapper.ToCard(result); + if (card is null) + return Results.NotFound(new { error = $"Agent {id} has no queued A2A endpoints." }); + + return Results.Ok(card); + } + + private static async Task ListCards( + AgentService agentService, + string? capability = null, + string? tags = null, + bool liveOnly = true, + int page = 1, + int pageSize = 20, + CancellationToken ct = default) + { + pageSize = Math.Clamp(pageSize, 1, 100); + page = Math.Max(1, page); + + var tagList = tags?.Split(',', StringSplitOptions.RemoveEmptyEntries | StringSplitOptions.TrimEntries) + .ToList(); + + // Filter by A2A protocol; transport filter is omitted so both Amqp and AzureServiceBus + // are returned — the mapper filters out Http endpoints when building cards. + var filter = new AgentSearchFilter( + CapabilityName: capability, + Tags: tagList?.Count > 0 ? tagList : null, + Protocol: ProtocolType.A2A, + Transport: null, + LiveOnly: liveOnly, + Page: page, + PageSize: pageSize); + + var result = await agentService.DiscoverAsync(filter, ct); + + var cards = result.Items + .Select(QueuedA2AMapper.ToCard) + .Where(c => c is not null) + .ToList(); + + return Results.Ok(new + { + agents = cards, + totalCount = result.TotalCount, + page = result.Page, + pageSize = result.PageSize, + totalPages = result.TotalPages, + hasNextPage = result.HasNextPage, + }); + } + + private static async Task RegisterViaCard( + QueuedAgentCard card, + AgentService agentService, + ClaimsPrincipal user, + CancellationToken ct) + { + if (string.IsNullOrWhiteSpace(card.QueueEndpoint?.TaskTopic)) + return Results.BadRequest("queueEndpoint.taskTopic is required — the routing key or topic path clients publish task messages to."); + + if (string.IsNullOrWhiteSpace(card.QueueEndpoint?.Technology)) + return Results.BadRequest("queueEndpoint.technology is required (e.g. \"rabbitmq\" or \"azure-service-bus\")."); + + var ownerId = user.FindFirstValue(ClaimTypes.NameIdentifier)!; + var mapped = QueuedA2AMapper.FromCard(card); + + var agent = await agentService.RegisterAsync( + mapped.Name, + mapped.Description, + ownerId, + labels: null, + mapped.Capabilities, + mapped.Endpoints, + ct); + + var agentWithLiveness = await agentService.GetByIdWithLivenessAsync(agent.Id, ct); + var registered = QueuedA2AMapper.ToCard(agentWithLiveness!); + + return Results.Created($"/a2a/async/agents/{agent.Id}", registered); + } +} diff --git a/src/MarimerLLC.AgentRegistry.Api/Protocols/QueuedA2A/QueuedA2AMapper.cs b/src/MarimerLLC.AgentRegistry.Api/Protocols/QueuedA2A/QueuedA2AMapper.cs new file mode 100644 index 0000000..f561dd1 --- /dev/null +++ b/src/MarimerLLC.AgentRegistry.Api/Protocols/QueuedA2A/QueuedA2AMapper.cs @@ -0,0 +1,151 @@ +using System.Text.Json; +using MarimerLLC.AgentRegistry.Api.Protocols.A2A.Models; +using MarimerLLC.AgentRegistry.Api.Protocols.QueuedA2A.Models; +using MarimerLLC.AgentRegistry.Application.Agents; +using MarimerLLC.AgentRegistry.Domain.Agents; + +namespace MarimerLLC.AgentRegistry.Api.Protocols.QueuedA2A; + +public static class QueuedA2AMapper +{ + private static readonly IReadOnlyList DefaultModes = ["application/json"]; + + // ── Domain → QueuedAgentCard ────────────────────────────────────────────── + + /// + /// Build a from a registry agent. + /// Returns null if the agent has no A2A endpoints with a non-HTTP transport. + /// + public static QueuedAgentCard? ToCard(AgentWithLiveness agentWithLiveness) + { + var agent = agentWithLiveness.Agent; + + var queuedEndpoints = agent.Endpoints + .Where(e => e.Protocol == ProtocolType.A2A && e.Transport != TransportType.Http) + .ToList(); + + if (queuedEndpoints.Count == 0) return null; + + // Prefer a live endpoint; fall back to the first registered. + var primary = queuedEndpoints.FirstOrDefault(e => + agentWithLiveness.LiveEndpointIds.Contains(e.Id)) ?? queuedEndpoints[0]; + + StoredQueuedA2AMetadata? stored = null; + if (!string.IsNullOrWhiteSpace(primary.ProtocolMetadata)) + { + try + { + stored = JsonSerializer.Deserialize( + primary.ProtocolMetadata, + JsonSerializerOptions.Web); + } + catch (JsonException) { /* build from domain only */ } + } + + // Prefer the stored skills verbatim — they preserve original IDs and all metadata. + // Fall back to building from domain capabilities only when no stored skills exist. + var skills = stored?.Skills is { Count: > 0 } + ? stored.Skills + : agent.Capabilities.Select(c => new AgentSkill + { + Id = c.Id.ToString(), + Name = c.Name, + Description = c.Description ?? c.Name, + Tags = c.Tags.ToList(), + }).ToList(); + + var queueEndpoint = stored?.QueueEndpoint ?? new Models.QueueEndpoint + { + Technology = ToTechnology(primary.Transport), + TaskTopic = primary.Address, + }; + + return new QueuedAgentCard + { + Id = agent.Id.ToString(), + Name = agent.Name, + Description = agent.Description ?? agent.Name, + Version = stored?.Version ?? "1.0", + Skills = skills, + DefaultInputModes = stored?.DefaultInputModes ?? DefaultModes, + DefaultOutputModes = stored?.DefaultOutputModes ?? DefaultModes, + QueueEndpoint = queueEndpoint, + IsLive = agentWithLiveness.LiveEndpointIds.Contains(primary.Id), + }; + } + + // ── QueuedAgentCard → domain ────────────────────────────────────────────── + + public record MappedRegistration( + string Name, + string Description, + IEnumerable Capabilities, + IEnumerable Endpoints); + + /// + /// Map a to the inputs needed for + /// . All card and queue endpoint + /// fields are serialised into ProtocolMetadata so discovery can + /// reconstruct the card exactly. + /// + public static MappedRegistration FromCard(QueuedAgentCard card) + { + var capabilities = card.Skills.Select(s => + new RegisterCapabilityRequest(s.Name, s.Description, s.Tags)); + + var metadata = JsonSerializer.Serialize(new StoredQueuedA2AMetadata + { + Version = card.Version, + Skills = card.Skills.ToList(), + DefaultInputModes = card.DefaultInputModes.ToList(), + DefaultOutputModes = card.DefaultOutputModes.ToList(), + QueueEndpoint = card.QueueEndpoint, + }, JsonSerializerOptions.Web); + + var transport = FromTechnology(card.QueueEndpoint.Technology); + + var endpoints = new[] + { + new RegisterEndpointRequest( + Name: $"async-{card.QueueEndpoint.Technology}", + Transport: transport, + Protocol: ProtocolType.A2A, + Address: card.QueueEndpoint.TaskTopic, + LivenessModel: LivenessModel.Persistent, + TtlDuration: null, + HeartbeatInterval: TimeSpan.FromSeconds(30), + ProtocolMetadata: metadata), + }; + + return new MappedRegistration(card.Name, card.Description, capabilities, endpoints); + } + + // ── Helpers ─────────────────────────────────────────────────────────────── + + private static string ToTechnology(TransportType transport) => transport switch + { + TransportType.AzureServiceBus => "azure-service-bus", + _ => "rabbitmq", + }; + + private static TransportType FromTechnology(string technology) => technology switch + { + "azure-service-bus" => TransportType.AzureServiceBus, + _ => TransportType.Amqp, + }; + + // ── Stored metadata shape ───────────────────────────────────────────────── + + /// + /// All card fields that have no equivalent in the generic domain model. + /// Serialised into for round-tripping. + /// + private record StoredQueuedA2AMetadata + { + public string Version { get; init; } = "1.0"; + public List? Skills { get; init; } + public List? DefaultInputModes { get; init; } + public List? DefaultOutputModes { get; init; } + public Models.QueueEndpoint? QueueEndpoint { get; init; } + } +} diff --git a/tests/AgentRegistry.Api.Tests/Protocols/QueuedA2A/QueuedA2AEndpointTests.cs b/tests/AgentRegistry.Api.Tests/Protocols/QueuedA2A/QueuedA2AEndpointTests.cs new file mode 100644 index 0000000..d018a53 --- /dev/null +++ b/tests/AgentRegistry.Api.Tests/Protocols/QueuedA2A/QueuedA2AEndpointTests.cs @@ -0,0 +1,331 @@ +using System.Net; +using System.Net.Http.Json; +using System.Text.Json; +using MarimerLLC.AgentRegistry.Api.Agents.Models; +using MarimerLLC.AgentRegistry.Api.Protocols.QueuedA2A.Models; +using MarimerLLC.AgentRegistry.Api.Tests.Infrastructure; +using MarimerLLC.AgentRegistry.Domain.Agents; + +namespace MarimerLLC.AgentRegistry.Api.Tests.Protocols.QueuedA2A; + +public class QueuedA2AEndpointTests(AgentRegistryFactory factory) + : IClassFixture, IDisposable +{ + private readonly HttpClient _admin = factory.CreateAdminClient(); + private readonly HttpClient _agent = factory.CreateAgentClient(); + private readonly HttpClient _anon = factory.CreateClient(); + + public void Dispose() => factory.Reset(); + + // ── POST /a2a/async/agents ──────────────────────────────────────────────── + + [Fact] + public async Task Register_WithRabbitMQ_Returns201_WithId() + { + var card = BuildRabbitMqCard("research-agent"); + + var response = await _agent.PostAsJsonAsync("/a2a/async/agents", card); + + Assert.Equal(HttpStatusCode.Created, response.StatusCode); + Assert.NotNull(response.Headers.Location); + + var result = await response.Content.ReadFromJsonAsync(); + Assert.NotNull(result); + Assert.NotNull(result.Id); + Assert.Equal("research-agent", result.Name); + Assert.Equal("rabbitmq", result.QueueEndpoint.Technology); + Assert.Equal("agent.task.ResearchAgent", result.QueueEndpoint.TaskTopic); + } + + [Fact] + public async Task Register_WithAzureServiceBus_Returns201_WithId() + { + var card = BuildAzureServiceBusCard("azure-agent"); + + var response = await _agent.PostAsJsonAsync("/a2a/async/agents", card); + + Assert.Equal(HttpStatusCode.Created, response.StatusCode); + + var result = await response.Content.ReadFromJsonAsync(); + Assert.NotNull(result); + Assert.NotNull(result.Id); + Assert.Equal("azure-agent", result.Name); + Assert.Equal("azure-service-bus", result.QueueEndpoint.Technology); + Assert.Equal("agent-tasks", result.QueueEndpoint.TaskTopic); + } + + [Fact] + public async Task Register_MissingTaskTopic_Returns400() + { + var card = BuildRabbitMqCard("bad-agent") with + { + QueueEndpoint = new QueueEndpoint + { + Technology = "rabbitmq", + Host = "rabbitmq.example.com", + TaskTopic = "", // empty — invalid + } + }; + + var response = await _agent.PostAsJsonAsync("/a2a/async/agents", card); + + Assert.Equal(HttpStatusCode.BadRequest, response.StatusCode); + } + + [Fact] + public async Task Register_Unauthenticated_Returns401() + { + var card = BuildRabbitMqCard("unauth-agent"); + + var response = await _anon.PostAsJsonAsync("/a2a/async/agents", card); + + Assert.Equal(HttpStatusCode.Unauthorized, response.StatusCode); + } + + [Fact] + public async Task Register_AsAdminClient_Returns201() + { + var card = BuildRabbitMqCard("admin-registered"); + + var response = await _admin.PostAsJsonAsync("/a2a/async/agents", card); + + Assert.Equal(HttpStatusCode.Created, response.StatusCode); + } + + // ── GET /a2a/async/agents/{id} ──────────────────────────────────────────── + + [Fact] + public async Task GetCard_ReturnsRegisteredCard_WithQueueEndpoint() + { + var registered = await RegisterAsync(BuildRabbitMqCard("get-test")); + + var response = await _anon.GetAsync($"/a2a/async/agents/{registered.Id}"); + + Assert.Equal(HttpStatusCode.OK, response.StatusCode); + + var card = await response.Content.ReadFromJsonAsync(); + Assert.NotNull(card); + Assert.Equal("get-test", card.Name); + Assert.NotNull(card.QueueEndpoint); + Assert.Equal("rabbitmq", card.QueueEndpoint.Technology); + } + + [Fact] + public async Task GetCard_IsPublic() + { + var registered = await RegisterAsync(BuildRabbitMqCard("public-card")); + var response = await _anon.GetAsync($"/a2a/async/agents/{registered.Id}"); + Assert.Equal(HttpStatusCode.OK, response.StatusCode); + } + + [Fact] + public async Task GetCard_UnknownId_Returns404() + { + var response = await _anon.GetAsync($"/a2a/async/agents/{Guid.NewGuid()}"); + Assert.Equal(HttpStatusCode.NotFound, response.StatusCode); + } + + [Fact] + public async Task GetCard_InvalidIdFormat_Returns400() + { + var response = await _anon.GetAsync("/a2a/async/agents/not-a-guid"); + Assert.Equal(HttpStatusCode.BadRequest, response.StatusCode); + } + + [Fact] + public async Task GetCard_AgentWithOnlyHttpEndpoints_Returns404() + { + // Register a plain HTTP A2A agent via the generic endpoint — it has no queued endpoints. + var req = new RegisterAgentRequest("Http A2A Only", null, null, null, + [new EndpointRequest("ep", TransportType.Http, ProtocolType.A2A, + "https://example.com/a2a", LivenessModel.Persistent, null, 30, null)]); + var resp = await _admin.PostAsJsonAsync("/agents", req); + resp.EnsureSuccessStatusCode(); + var created = await resp.Content.ReadFromJsonAsync(); + + var response = await _anon.GetAsync($"/a2a/async/agents/{created!.Id}"); + Assert.Equal(HttpStatusCode.NotFound, response.StatusCode); + } + + [Fact] + public async Task GetCard_IncludesLivenessStatus() + { + var registered = await RegisterAsync(BuildRabbitMqCard("live-check")); + var card = await _anon.GetFromJsonAsync($"/a2a/async/agents/{registered.Id}"); + Assert.NotNull(card); + Assert.True(card.IsLive); + } + + // ── GET /a2a/async/agents ───────────────────────────────────────────────── + + [Fact] + public async Task ListCards_IsPublic() + { + var response = await _anon.GetAsync("/a2a/async/agents"); + Assert.Equal(HttpStatusCode.OK, response.StatusCode); + } + + [Fact] + public async Task ListCards_FiltersToQueuedAgents() + { + await RegisterAsync(BuildRabbitMqCard("queued-1")); + await RegisterAsync(BuildRabbitMqCard("queued-2")); + + // Register an HTTP A2A agent — must NOT appear in the queued list. + var req = new RegisterAgentRequest("Http A2A", null, null, null, + [new EndpointRequest("ep", TransportType.Http, ProtocolType.A2A, + "https://example.com/a2a", LivenessModel.Persistent, null, 30, null)]); + await _admin.PostAsJsonAsync("/agents", req); + + var result = await _anon.GetFromJsonAsync("/a2a/async/agents?liveOnly=true"); + Assert.NotNull(result); + + var agents = result.RootElement.GetProperty("agents").EnumerateArray().ToList(); + Assert.Equal(2, agents.Count); + Assert.All(agents, a => Assert.NotEqual("Http A2A", a.GetProperty("name").GetString())); + } + + [Fact] + public async Task ListCards_PaginationWorks() + { + for (var i = 0; i < 5; i++) + await RegisterAsync(BuildRabbitMqCard($"paged-{i:D2}")); + + var result = await _anon.GetFromJsonAsync("/a2a/async/agents?liveOnly=true&page=1&pageSize=3"); + Assert.NotNull(result); + + var agents = result.RootElement.GetProperty("agents").EnumerateArray().ToList(); + Assert.Equal(3, agents.Count); + Assert.Equal(5, result.RootElement.GetProperty("totalCount").GetInt32()); + Assert.True(result.RootElement.GetProperty("hasNextPage").GetBoolean()); + } + + // ── Round-trip ──────────────────────────────────────────────────────────── + + [Fact] + public async Task RoundTrip_AllFields_PreservedOnDiscovery() + { + var original = new QueuedAgentCard + { + Name = "FullAgent", + Description = "Full round-trip test agent", + Version = "2.1", + Skills = + [ + new() { Id = "research", Name = "Research", Description = "Web research", Tags = ["search", "web"] }, + new() { Id = "summarize", Name = "Summarize", Description = "Text summarization", Tags = ["nlp"] }, + ], + DefaultInputModes = ["application/json", "text/plain"], + DefaultOutputModes = ["application/json"], + QueueEndpoint = new QueueEndpoint + { + Technology = "rabbitmq", + Host = "rabbitmq.prod.example.com", + Port = 5672, + VirtualHost = "/rockbot", + Exchange = "rockbot", + TaskTopic = "agent.task.FullAgent", + ResponseTopic = "agent.response.{callerName}", + }, + }; + + var createResp = await _agent.PostAsJsonAsync("/a2a/async/agents", original); + createResp.EnsureSuccessStatusCode(); + var created = await createResp.Content.ReadFromJsonAsync(); + + var fetched = await _anon.GetFromJsonAsync($"/a2a/async/agents/{created!.Id}"); + Assert.NotNull(fetched); + + Assert.Equal(original.Name, fetched.Name); + Assert.Equal(original.Description, fetched.Description); + Assert.Equal(original.Version, fetched.Version); + Assert.Equal(original.DefaultInputModes, fetched.DefaultInputModes); + Assert.Equal(original.DefaultOutputModes, fetched.DefaultOutputModes); + + var q = fetched.QueueEndpoint; + Assert.Equal("rabbitmq", q.Technology); + Assert.Equal("rabbitmq.prod.example.com", q.Host); + Assert.Equal(5672, q.Port); + Assert.Equal("/rockbot", q.VirtualHost); + Assert.Equal("rockbot", q.Exchange); + Assert.Equal("agent.task.FullAgent", q.TaskTopic); + Assert.Equal("agent.response.{callerName}", q.ResponseTopic); + + Assert.Equal(2, fetched.Skills.Count); + var skillIds = fetched.Skills.Select(s => s.Id).ToHashSet(); + Assert.Contains("research", skillIds); + Assert.Contains("summarize", skillIds); + } + + [Fact] + public async Task RoundTrip_AzureServiceBus_AllFieldsPreserved() + { + var original = BuildAzureServiceBusCard("azure-roundtrip"); + + var createResp = await _agent.PostAsJsonAsync("/a2a/async/agents", original); + createResp.EnsureSuccessStatusCode(); + var created = await createResp.Content.ReadFromJsonAsync(); + + var fetched = await _anon.GetFromJsonAsync($"/a2a/async/agents/{created!.Id}"); + Assert.NotNull(fetched); + + var q = fetched.QueueEndpoint; + Assert.Equal("azure-service-bus", q.Technology); + Assert.Equal("mybus.servicebus.windows.net", q.Namespace); + Assert.Equal("agent-tasks", q.EntityPath); + Assert.Equal("agent-tasks", q.TaskTopic); + } + + // ── Helpers ─────────────────────────────────────────────────────────────── + + private async Task RegisterAsync(QueuedAgentCard card) + { + var response = await _agent.PostAsJsonAsync("/a2a/async/agents", card); + response.EnsureSuccessStatusCode(); + return (await response.Content.ReadFromJsonAsync())!; + } + + private static QueuedAgentCard BuildRabbitMqCard(string name) => new() + { + Name = name, + Description = $"{name} description", + Version = "1.0", + Skills = + [ + new() { Id = "default", Name = name, Description = $"{name} skill", Tags = ["async"] }, + ], + DefaultInputModes = ["application/json"], + DefaultOutputModes = ["application/json"], + QueueEndpoint = new QueueEndpoint + { + Technology = "rabbitmq", + Host = "rabbitmq.example.com", + Port = 5672, + VirtualHost = "/", + Exchange = "rockbot", + TaskTopic = "agent.task.ResearchAgent", + ResponseTopic = "agent.response.{callerName}", + }, + }; + + private static QueuedAgentCard BuildAzureServiceBusCard(string name) => new() + { + Name = name, + Description = $"{name} description", + Version = "1.0", + Skills = + [ + new() { Id = "default", Name = name, Description = $"{name} skill", Tags = ["async"] }, + ], + DefaultInputModes = ["application/json"], + DefaultOutputModes = ["application/json"], + QueueEndpoint = new QueueEndpoint + { + Technology = "azure-service-bus", + Namespace = "mybus.servicebus.windows.net", + EntityPath = "agent-tasks", + TaskTopic = "agent-tasks", + ResponseTopic = "agent-response", + }, + }; +} From feb0bafff6f47b53fbcb5dee9b6a4015f4bb789b Mon Sep 17 00:00:00 2001 From: Rockford Lhotka Date: Mon, 2 Mar 2026 21:52:34 -0600 Subject: [PATCH 2/2] Document Queued A2A adapter in README and /docs Add docs/protocol-queued-a2a.md covering: broker technologies (RabbitMQ, Azure Service Bus), the QueuedAgentCard model and queueEndpoint fields, design decisions (why ProtocolType stays A2A, separate adapter rationale, credentials-not-stored policy, skills round-trip), and registration flow examples for both broker types with KEDA liveness patterns. Update README: - Add QueuedA2A to protocol support section with API surface summary - Extend Queue-backed agents section to show both registration paths - Add /a2a/async/agents rows to the API overview table - Update auth section's public-endpoint list - Update project structure to reflect QueuedA2A/ folders - Add link to new docs page alongside A2A/MCP/ACP Co-Authored-By: Claude Sonnet 4.6 --- README.md | 76 ++++++++++-- docs/protocol-queued-a2a.md | 239 ++++++++++++++++++++++++++++++++++++ 2 files changed, 305 insertions(+), 10 deletions(-) create mode 100644 docs/protocol-queued-a2a.md diff --git a/README.md b/README.md index b1568ed..1f671d4 100644 --- a/README.md +++ b/README.md @@ -22,7 +22,7 @@ AgentRegistry is the connective tissue. An agent registers itself when it starts │ AgentRegistry.Api │ │ ASP.NET Core 10 · Minimal APIs · Scalar UI · OpenTelemetry │ │ Auth: API key (Admin/Agent scopes) + JWT Bearer │ -│ Protocol adapters: A2A · MCP │ +│ Protocol adapters: A2A · MCP · ACP · Queued A2A │ └────────────┬───────────────────────┬────────────────────────────┘ │ │ ┌────────────▼───────┐ ┌───────────▼────────────────────────────┐ @@ -60,6 +60,7 @@ Detailed design rationale for each adapter is in [`/docs`](docs/): - [A2A adapter design](docs/protocol-a2a.md) - [MCP adapter design](docs/protocol-mcp.md) - [ACP adapter design](docs/protocol-acp.md) +- [Queued A2A adapter design](docs/protocol-queued-a2a.md) ### A2A (Agent-to-Agent) @@ -96,6 +97,18 @@ Targets [ACP spec 0.2.0](https://github.com/i-am-bee/acp) (IBM Research / BeeAI) The manifest carries MIME-typed content types, JSON Schema for input/output/config/thread state, performance status metrics, and rich metadata (framework, natural languages, license, author). All fields round-trip through `Endpoint.ProtocolMetadata`. Agent names are normalised to RFC 1123 DNS-label format on manifest generation. +### Queued A2A (A2A over async message brokers) + +Agents that communicate via **RabbitMQ** or **Azure Service Bus** rather than HTTP. The A2A wire protocol (task request / status update / result message shapes) is unchanged — only the transport differs. Callers publish A2A task messages to a named topic on the broker and receive responses asynchronously, enabling KEDA-scaled workers, long-running tasks, and decoupled agent pipelines. + +- `GET /a2a/async/agents/{id}` — queued A2A agent card with full broker connection details (public) +- `GET /a2a/async/agents` — filtered list of queued agents (public) +- `POST /a2a/async/agents` — register by submitting a queued agent card (Agent or Admin) + +The `queueEndpoint` object in each card carries `technology` (`"rabbitmq"` or `"azure-service-bus"`), `host`, `port`, `virtualHost`, `exchange`, `taskTopic` (where callers publish), and `responseTopic` (where callers listen for replies). For Azure Service Bus, `namespace` and `entityPath` replace the AMQP-specific fields. + +Queued agents also appear in `GET /discover/agents?protocol=A2A` alongside HTTP A2A agents since they share the same protocol type. See [Queued A2A adapter design](docs/protocol-queued-a2a.md) for the full design rationale. + ### Generic (protocol-agnostic) All protocols can also be registered and discovered through the generic API, which returns the registry's own domain model rather than protocol-native card formats. @@ -230,7 +243,7 @@ The registry accepts two authentication methods, selected by header: - A `registry_scope` claim with value `Admin` or `Agent` - A `roles` claim with value `registry.admin` or `registry.agent` -Discovery, the MCP server endpoint, and protocol card endpoints (`/discover/agents`, `/mcp`, `/a2a/agents/*`, `/mcp/servers/*`) are always public — no auth required. +Discovery, the MCP server endpoint, and protocol card endpoints (`/discover/agents`, `/mcp`, `/a2a/agents/*`, `/a2a/async/agents/*`, `/mcp/servers/*`, `/acp/agents/*`) are always public — no auth required. ## API overview @@ -273,6 +286,14 @@ Discovery, the MCP server endpoint, and protocol card endpoints (`/discover/agen | `GET` | `/acp/agents` | Public | Filtered list of ACP agent manifests | | `POST` | `/acp/agents` | Agent or Admin | Register by submitting an ACP agent manifest | +### Queued A2A protocol + +| Method | Path | Auth | Description | +|---|---|---|---| +| `GET` | `/a2a/async/agents/{id}` | Public | Queued A2A card with broker connection details | +| `GET` | `/a2a/async/agents` | Public | Filtered list of queued A2A agent cards | +| `POST` | `/a2a/async/agents` | Agent or Admin | Register an agent with queue endpoint details | + ### Key management and system | Method | Path | Auth | Description | @@ -354,20 +375,53 @@ The two services are non-conflicting — if a config-defined agent also self-reg ## Queue-backed agents -Agents using AMQP or Azure Service Bus don't need to be running when discovered. The registry stores the queue address as the endpoint: +Agents using AMQP or Azure Service Bus don't need to be running when discovered. The registry stores the queue address as the endpoint. There are two ways to register a queue-backed agent: + +### Queued A2A (recommended for A2A agents over a broker) + +Use `POST /a2a/async/agents` with the full `queueEndpoint` connection details: + +```json +{ + "name": "ResearchAgent", + "description": "On-demand research agent", + "version": "1.0", + "skills": [{ "id": "research", "name": "Research", "description": "Researches a topic", "tags": ["search"] }], + "defaultInputModes": ["application/json"], + "defaultOutputModes": ["application/json"], + "queueEndpoint": { + "technology": "rabbitmq", + "host": "rabbitmq.example.com", + "port": 5672, + "virtualHost": "/", + "exchange": "rockbot", + "taskTopic": "agent.task.ResearchAgent", + "responseTopic": "agent.response.{callerName}" + } +} +``` + +Clients discover the agent via `GET /a2a/async/agents` and receive the full broker connection details needed to publish A2A task messages directly. + +### Generic registration + +Use `POST /agents` with explicit `transport` and `protocol` fields. This works for any protocol/transport combination but returns the registry's internal model rather than a protocol-native card: ```json { "name": "async-processor", - "transport": "AzureServiceBus", - "protocol": "A2A", - "address": "agents/summarizer/requests", - "livenessModel": "Ephemeral", - "ttlSeconds": 60 + "endpoints": [{ + "name": "queue", + "transport": "AzureServiceBus", + "protocol": "A2A", + "address": "agents/summarizer/requests", + "livenessModel": "Ephemeral", + "ttlSeconds": 60 + }] } ``` -A KEDA-scaled worker registers on startup, processes jobs, and the TTL expires naturally when the scaling group idles. Consumers route work to the queue address — whether the worker is currently running or not is KEDA's concern. +In both cases, a KEDA-scaled worker registers on startup, processes jobs, and the TTL expires naturally when the scaling group idles. Consumers route work to the queue address — whether the worker is currently running or not is KEDA's concern. See [Queued A2A adapter design](docs/protocol-queued-a2a.md) for the full pattern including liveness and round-tripping. ## Configuration reference @@ -429,9 +483,10 @@ src/ AgentRegistry.Infrastructure/ EF Core (PostgreSQL), Redis, SQL API key service AgentRegistry.Api/ ASP.NET Core 10 minimal API, auth, Scalar Protocols/ - A2A/ A2A v1.0 RC agent card adapter + A2A/ A2A v1.0 RC agent card adapter (HTTP) MCP/ MCP 2025-11-25 server card adapter (Streamable HTTP) ACP/ ACP 0.2.0 agent manifest adapter + QueuedA2A/ A2A over async message brokers (RabbitMQ, Azure Service Bus) tests/ AgentRegistry.Domain.Tests/ Domain unit tests AgentRegistry.Application.Tests/ Service tests using Rocks source-gen mocks @@ -440,6 +495,7 @@ tests/ A2A/ A2A endpoint tests MCP/ MCP endpoint tests ACP/ ACP endpoint tests + QueuedA2A/ Queued A2A endpoint tests k8s/ redis.yaml Redis StatefulSet + Service agentregistry/ diff --git a/docs/protocol-queued-a2a.md b/docs/protocol-queued-a2a.md new file mode 100644 index 0000000..be8c7a1 --- /dev/null +++ b/docs/protocol-queued-a2a.md @@ -0,0 +1,239 @@ +# Queued A2A Protocol Adapter + +## What is Queued A2A? + +Queued A2A describes agents that use the **A2A wire protocol over an async message broker** instead of HTTP. The message format — task requests, status updates, results — is identical to standard A2A. What changes is the transport: callers publish messages to a queue or topic on a broker (RabbitMQ, Azure Service Bus) and receive responses asynchronously on a reply topic, rather than making a synchronous HTTP call. + +This pattern is common in: + +- **KEDA-scaled workers** — agents that are scaled to zero when idle. A KEDA trigger watches the queue depth; the worker spins up when messages arrive. +- **Long-running tasks** — research, code generation, batch processing where a 30-second HTTP timeout is impractical. +- **Decoupled pipelines** — agent chains where intermediate results pass through a broker, enabling retries, dead-lettering, and fan-out without tight coupling between agents. + +The rockbot project demonstrates this pattern concretely: `SampleAgent` and `ResearchAgent` run as KEDA-scalable workers, communicate via RabbitMQ, and use A2A message shapes for all task exchange. + +## Supported broker technologies + +| Technology | `TransportType` | Key fields | +|---|---|---| +| RabbitMQ (AMQP 0-9-1) | `Amqp` | `host`, `port`, `virtualHost`, `exchange`, `taskTopic` | +| Azure Service Bus | `AzureServiceBus` | `namespace`, `entityPath`, `taskTopic` | + +## The QueuedAgentCard + +The discovery and registration format is a `QueuedAgentCard` — an A2A agent card extended with a `queueEndpoint` object: + +```json +{ + "name": "ResearchAgent", + "description": "On-demand research agent using web search and page fetching", + "version": "1.0", + "skills": [ + { + "id": "research", + "name": "Research", + "description": "Research a topic using web search", + "tags": ["search", "web"] + } + ], + "defaultInputModes": ["application/json"], + "defaultOutputModes": ["application/json"], + "queueEndpoint": { + "technology": "rabbitmq", + "host": "rabbitmq.example.com", + "port": 5672, + "virtualHost": "/", + "exchange": "rockbot", + "taskTopic": "agent.task.ResearchAgent", + "responseTopic": "agent.response.{callerName}" + }, + "id": "3fa85f64-...", + "isLive": true +} +``` + +### `queueEndpoint` fields + +| Field | Required | Description | +|---|---|---| +| `technology` | Yes | `"rabbitmq"` or `"azure-service-bus"` | +| `taskTopic` | Yes | Routing key or topic path that callers publish task messages to | +| `host` | RabbitMQ | Broker hostname (e.g. `rabbitmq.example.com`) | +| `port` | No | Broker port; defaults to 5672 (AMQP) or 5671 (AMQPS) if omitted | +| `virtualHost` | No | AMQP virtual host; typically `/` | +| `exchange` | No | AMQP exchange name (topic exchange, e.g. `rockbot`) | +| `responseTopic` | No | Pattern callers subscribe to for responses (e.g. `agent.response.{callerName}`) | +| `namespace` | Azure SB | Fully-qualified Service Bus namespace (e.g. `mybus.servicebus.windows.net`) | +| `entityPath` | Azure SB | Service Bus queue or topic path | + +### Registry-added fields + +`id` and `isLive` are added by the registry on discovery responses. Omit them when registering. + +## API endpoints + +| Method | Path | Auth | Description | +|---|---|---|---| +| `POST` | `/a2a/async/agents` | Agent or Admin | Register an agent with queue endpoint details | +| `GET` | `/a2a/async/agents` | Public | List / discover queued agents (paginated) | +| `GET` | `/a2a/async/agents/{id}` | Public | Retrieve a specific queued agent card | + +The list endpoint supports the same query parameters as other protocol list endpoints: `capability`, `tags`, `liveOnly` (default `true`), `page`, `pageSize` (max 100). + +## Design decisions + +### ProtocolType stays A2A + +The A2A message shapes (task request, status update, result) are unchanged. Only the transport layer differs. Using a new `ProtocolType` would fragment discovery — a consumer searching for A2A agents would miss queue-backed ones. Keeping `ProtocolType.A2A` means `GET /discover/agents?protocol=A2A` returns both HTTP and queued A2A agents. + +`TransportType` distinguishes them: `Http` for classic A2A over HTTP, `Amqp` for RabbitMQ, `AzureServiceBus` for Azure. + +### Separate adapter from the HTTP A2A adapter + +The HTTP A2A adapter serves and accepts standard A2A agent cards where `supportedInterfaces[].url` is an HTTP URL. For queued agents, the relevant connection information (exchange, routing key, virtual host) doesn't fit cleanly into a URL field. A dedicated `/a2a/async/agents` surface with a `queueEndpoint` object is clearer for consumers than trying to encode broker details into a URL. + +The existing `GET /a2a/agents/{id}` endpoint continues to work for HTTP A2A agents and will return a synthetic placeholder URL for any non-HTTP endpoints it encounters. The dedicated queued endpoint is the intended surface for agents that are truly queue-native. + +### Connection details in ProtocolMetadata + +All `queueEndpoint` fields and the full card (version, skills, I/O modes) are serialised into `Endpoint.ProtocolMetadata` at registration time. On discovery, the mapper deserialises them to reconstruct the original card exactly. This is the same round-trip strategy used by A2A, MCP, and ACP — no domain model changes required for new fields. + +`Endpoint.Address` holds `taskTopic` — the routing key / entity path where callers publish. This is the primary "address" of the agent from the registry's perspective, analogous to an HTTP URL. + +### No credentials in the card + +Connection strings with usernames, passwords, or SAS tokens are not stored. The card contains only the structural connection details (host, port, exchange, topic). Callers are expected to hold broker credentials separately — via Kubernetes secrets, Azure Key Vault, or equivalent — and combine them with the structural details from the registry. + +### Skills and domain capabilities + +Skills in the `QueuedAgentCard` map directly to registry capabilities, enabling queued agents to be discovered through the generic `GET /discover/agents?capability=research` endpoint alongside HTTP agents. When an agent registers with multiple skills, all skills become capabilities in the domain model. + +On discovery, the stored skills are returned verbatim (preserving original string IDs like `"research"` rather than internal Guid IDs) because they are read from `ProtocolMetadata`, not reconstructed from capabilities. + +### isLive reflects heartbeat state + +Queued agents typically use the `Persistent` liveness model and call `POST /agents/{id}/endpoints/{eid}/heartbeat` periodically. KEDA-scaled workers may use `Ephemeral` liveness — registering when a pod starts, calling `POST /agents/{id}/endpoints/{eid}/renew` on each task invocation, and relying on TTL expiry when the pod scales to zero. + +`isLive: false` does not mean the agent's queue is unavailable — it means the registry has not seen a heartbeat or renewal within the grace period. Consumers can choose to route work to stale endpoints if they know the queue is durable. + +## Registration flows + +### RabbitMQ agent + +```json +POST /a2a/async/agents +Authorization: X-Api-Key: ar_... + +{ + "name": "ResearchAgent", + "description": "On-demand research agent", + "version": "1.0", + "skills": [ + { "id": "research", "name": "Research", "description": "Researches a topic", "tags": ["search"] } + ], + "defaultInputModes": ["application/json"], + "defaultOutputModes": ["application/json"], + "queueEndpoint": { + "technology": "rabbitmq", + "host": "rabbitmq.prod.example.com", + "port": 5672, + "virtualHost": "/", + "exchange": "rockbot", + "taskTopic": "agent.task.ResearchAgent", + "responseTopic": "agent.response.{callerName}" + } +} +``` + +Response `201 Created`: +```json +{ + "id": "3fa85f64-5717-4562-b3fc-2c963f66afa6", + "name": "ResearchAgent", + "description": "On-demand research agent", + "version": "1.0", + "skills": [{ "id": "research", "name": "Research", "description": "Researches a topic", "tags": ["search"] }], + "defaultInputModes": ["application/json"], + "defaultOutputModes": ["application/json"], + "queueEndpoint": { + "technology": "rabbitmq", + "host": "rabbitmq.prod.example.com", + "port": 5672, + "virtualHost": "/", + "exchange": "rockbot", + "taskTopic": "agent.task.ResearchAgent", + "responseTopic": "agent.response.{callerName}" + }, + "isLive": true +} +``` + +### Azure Service Bus agent + +```json +POST /a2a/async/agents +{ + "name": "InvoiceProcessor", + "description": "Processes invoice documents", + "version": "1.0", + "skills": [ + { "id": "process-invoice", "name": "Process Invoice", "description": "Extracts and validates invoice data", "tags": ["finance", "ocr"] } + ], + "defaultInputModes": ["application/json", "application/pdf"], + "defaultOutputModes": ["application/json"], + "queueEndpoint": { + "technology": "azure-service-bus", + "namespace": "mybus.servicebus.windows.net", + "entityPath": "invoice-processor-tasks", + "taskTopic": "invoice-processor-tasks", + "responseTopic": "invoice-processor-responses" + } +} +``` + +### Keeping liveness alive (KEDA worker pattern) + +An ephemeral, KEDA-scaled worker registers on pod startup and renews on each task: + +```bash +# On pod startup — register with 5-minute TTL +curl -X POST https://registry.example.com/a2a/async/agents \ + -H "X-Api-Key: $REGISTRY_KEY" \ + -d '{ "name": "ResearchAgent", ..., "livenessModel": "Ephemeral", "ttlSeconds": 300 }' + +# Capture the endpoint ID from the response, then on each task invocation: +curl -X POST https://registry.example.com/agents/$AGENT_ID/endpoints/$ENDPOINT_ID/renew \ + -H "X-Api-Key: $REGISTRY_KEY" +``` + +When the pod scales to zero, the TTL expires and the agent is no longer returned in `liveOnly=true` queries. The queue address in the card remains valid — KEDA will scale a new pod when messages arrive. + +For persistent workers, use `heartbeatIntervalSeconds` instead and call `POST .../heartbeat` on schedule. + +## Discovering queued agents + +```bash +# All live queued A2A agents +GET /a2a/async/agents + +# Filter by capability +GET /a2a/async/agents?capability=research&liveOnly=true + +# Filter by tag +GET /a2a/async/agents?tags=finance,ocr +``` + +Discovery returns a paginated list: + +```json +{ + "agents": [...], + "totalCount": 12, + "page": 1, + "pageSize": 20, + "totalPages": 1, + "hasNextPage": false +} +``` + +Queued agents also appear in the generic discovery endpoint (`GET /discover/agents?protocol=A2A`) alongside HTTP A2A agents, since they share `ProtocolType.A2A`.