Skip to content

Latest commit

 

History

History
305 lines (232 loc) · 10.7 KB

File metadata and controls

305 lines (232 loc) · 10.7 KB

Messaging subsystem

The messaging subsystem provides the communication backbone for all inter-component communication in RockBot. Agents, user proxies, script runners, and tool bridges communicate exclusively through a topic-based pub/sub message bus — no direct method calls, no shared memory, no in-process coupling.


Design principles

  • Transport agnosticism — all contracts are defined in RockBot.Messaging.Abstractions with no provider-specific types leaking into application code
  • Explicit acknowledgment — handlers return MessageResult (Ack/Retry/DeadLetter); the infrastructure never silently drops or requeues messages
  • Immutable envelopesMessageEnvelope is a sealed record; the body is raw bytes, not a typed object, so the transport layer is agnostic to payload schema
  • Built-in observability — W3C trace context is propagated through headers; metrics are recorded per destination and result

Core abstractions

MessageEnvelope

The universal message container. Every message flowing through the system, regardless of type or direction, is wrapped in a MessageEnvelope.

public sealed record MessageEnvelope(
    string MessageId,           // Unique ID (GUID, auto-generated by Create())
    string MessageType,         // Type classifier — used for routing and deserialization
    string Source,              // Originating component name
    ReadOnlyMemory<byte> Body,  // Raw payload (JSON-encoded, but transport doesn't know this)
    DateTimeOffset Timestamp,
    string? CorrelationId,      // Links a reply to its originating request
    string? ReplyTo,            // Topic where replies should be published
    string? Destination,        // Target agent name; null for broadcast
    IReadOnlyDictionary<string, string> Headers  // Custom metadata (see WellKnownHeaders)
);

MessageEnvelope.Create(messageType, source, body) generates a new MessageId (GUID) and defaults Timestamp to DateTimeOffset.UtcNow.

IMessagePublisher

public interface IMessagePublisher : IAsyncDisposable
{
    Task PublishAsync(string topic, MessageEnvelope envelope, CancellationToken ct = default);
}

Publishes a message to a named topic. The topic is a hierarchical dot-separated string used as a routing key (e.g. user.message, tool.invoke.mcp, script.invoke).

IMessageSubscriber

public interface IMessageSubscriber : IAsyncDisposable
{
    Task<ISubscription> SubscribeAsync(
        string topic,
        string subscriptionName,
        Func<MessageEnvelope, CancellationToken, Task<MessageResult>> handler,
        CancellationToken ct = default);
}

Creates a durable subscription. Each subscriptionName gets its own queue — multiple agents with the same subscription name form a competing-consumer group (only one processes each message). Different subscription names on the same topic receive independent copies.

Wildcard patterns are supported:

  • * — matches exactly one path segment: tool.invoke.* matches tool.invoke.mcp but not tool.invoke.mcp.list
  • # — matches zero or more segments: agent.# matches everything below agent

ISubscription

public interface ISubscription : IAsyncDisposable
{
    string Topic { get; }
    string SubscriptionName { get; }
    bool IsActive { get; }
}

A handle to an active subscription. Disposing it unsubscribes and releases the channel.

MessageResult

The handler's explicit decision about what happens to the message after processing:

Value Meaning
Ack Processed successfully — remove from queue
Retry Transient failure — requeue for redelivery
DeadLetter Poison message — route to dead-letter queue, do not retry

Payload helpers

MessageEnvelopeExtensions (in RockBot.Messaging.Abstractions) handles JSON serialization using System.Text.Json with camelCase policy:

// Wrap a typed payload into an envelope
var envelope = userMessage.ToEnvelope<UserMessage>(
    source: "blazor-proxy",
    correlationId: Guid.NewGuid().ToString("N"),
    replyTo: "user.response.proxy-1",
    destination: "rockbot-agent");

// Unwrap a typed payload from an envelope
var userMessage = envelope.GetPayload<UserMessage>();

Both methods accept an optional JsonSerializerOptions to override the default camelCase policy.


Headers and trust levels

WellKnownHeaders defines standard header keys used by the framework:

Header key Constant Purpose
rb-content-trust ContentTrust Trust level of the content
rb-tool-provider ToolProvider Backend type (mcp, etc.)
rb-timeout-ms TimeoutMs Invocation timeout override

ContentTrustValues defines standard values for the rb-content-trust header:

Value Meaning
system Agent-generated system prompts and directives
user-input Human user messages
tool-request Outbound tool invocations
tool-output External tool responses — treated as untrusted
agent-message Agent-to-agent content

Trust levels allow middleware and handlers to apply different validation or sanitization policies depending on the origin of content.


Trace context propagation

TraceContextPropagator propagates W3C TraceContext through message headers using only System.Diagnostics — no OpenTelemetry SDK dependency in the abstractions layer:

// Inject current Activity context into outgoing headers
TraceContextPropagator.Inject(Activity.Current, headers);

// Extract parent context from incoming headers
var parentContext = TraceContextPropagator.Extract(envelope.Headers);

The publisher injects trace context into every outgoing envelope's headers. The subscriber extracts it and creates a child Activity, so distributed traces flow seamlessly across process boundaries through the message bus.


RabbitMQ provider

RockBot.Messaging.RabbitMQ implements the messaging abstractions using RabbitMQ with a topic exchange. No application code references RabbitMQ types directly.

Exchange topology

rockbot (topic exchange)
  ├── rockbot.{subscriptionName}          ← durable queue per subscription
  │     DLX: rockbot.dlx
  │
  └── rockbot.{subscriptionName}.dlq     ← dead-letter queue (auto-created)
        Bound to: rockbot.dlx

Every queue has a dead-letter exchange configured at creation time. Messages that return DeadLetter from a handler (or are rejected after exhausting retries) route to rockbot.{subscriptionName}.dlq for inspection.

Connection and channel model

RabbitMQ connections are heavyweight (TCP + authentication); channels are lightweight. The provider follows RabbitMQ's recommended pattern:

  • One connection per processRabbitMqConnectionManager holds a single shared IConnection
  • One channel per publisher/consumer — channels are not thread-safe; each Publish call uses a dedicated channel per publisher instance; each subscriber gets a dedicated channel per subscription

Configuration

public sealed class RabbitMqOptions
{
    public string HostName { get; set; } = "localhost";
    public int Port { get; set; } = 5672;
    public string UserName { get; set; } = "guest";
    public string Password { get; set; } = "guest";
    public string VirtualHost { get; set; } = "/";
    public string ExchangeName { get; set; } = "rockbot";
    public string DeadLetterExchangeName { get; set; } = "rockbot.dlx";
    public bool Durable { get; set; } = true;
    public ushort PrefetchCount { get; set; } = 10;
}

AMQP header mapping

Standard AMQP properties are mapped from MessageEnvelope fields:

AMQP property MessageEnvelope field
MessageId MessageId
Type MessageType
CorrelationId CorrelationId
ReplyTo ReplyTo
Timestamp Timestamp (Unix epoch seconds)
ContentType Always "application/json"
DeliveryMode Always 2 (persistent)

Custom headers are written to the AMQP Headers table with rb- prefix:

AMQP header key Source
rb-source envelope.Source
rb-destination envelope.Destination
rb-traceparent Injected by TraceContextPropagator
rb-{key} Any user-supplied envelope.Headers entry

Metrics

RabbitMqDiagnostics records zero-allocation metrics via System.Diagnostics.Metrics:

Metric Type Tags
rockbot.messaging.publish.duration Histogram (ms) destination
rockbot.messaging.publish.messages Counter destination
rockbot.messaging.process.duration Histogram (ms) destination, result
rockbot.messaging.process.messages Counter destination, result
rockbot.messaging.active_messages UpDownCounter

DI registration

services.AddRockBotRabbitMq(opts =>
{
    opts.HostName = "rabbitmq.cluster.local";
    opts.Port = 5672;
    opts.UserName = "rockbot";
    opts.Password = "secret";
    opts.VirtualHost = "/";
});

This registers RabbitMqConnectionManager, IMessagePublisher, and IMessageSubscriber as singletons.


Topic naming conventions

Topics follow a hierarchical dot-separated scheme. Wildcard subscribers can cover broad categories; point-to-point uses full topic names.

Topic Direction Purpose
user.message User proxy → Agent User input messages
user.response.{proxyId} Agent → User proxy Agent replies
user.feedback User proxy → Agent Thumbs-up / thumbs-down
conversation.history.request User proxy → Agent Request conversation history
conversation.history.response.{proxyId} Agent → User proxy History reply
tool.invoke.* Agent → Tool bridge Tool invocation requests
tool.result.* Tool bridge → Agent Tool results
script.invoke Agent → Scripts Manager Script execution request
script.result.{correlationId} Scripts Manager → Agent Script output
agent.task.* Agent → Agent A2A task delegation

In-process bus (development/testing)

RockBot.Messaging.InProcess provides an in-memory message bus with the same IMessagePublisher / IMessageSubscriber interfaces. No RabbitMQ required.

Use for:

  • Local development without a running RabbitMQ instance
  • Unit and integration tests
  • Single-process multi-agent scenarios

Register with:

services.AddInProcessMessaging();