From 92140c9c0c995ed0cd5a3fa8b66cb557fefb399e Mon Sep 17 00:00:00 2001 From: Rockford Lhotka Date: Thu, 2 Apr 2026 14:33:13 -0500 Subject: [PATCH] Add inbound A2A peer support with trust model and idle notifications (#99) Make RockBot a full A2A peer so external agents can discover it and send it tasks. Implements pluggable identity verification (IAgentIdentityVerifier), a four-level per-caller trust model (Observe/Learn/Propose/Act), and an idle-aware notification queue that batches inbound A2A results for the user. Co-Authored-By: Claude Opus 4.6 (1M context) --- .../AgentTrustEntry.cs | 26 ++ .../AgentTrustLevel.cs | 20 ++ .../IAgentIdentityVerifier.cs | 18 ++ .../IAgentTrustStore.cs | 24 ++ .../VerifiedAgentIdentity.cs | 40 +++ src/RockBot.A2A/A2AOptions.cs | 7 + .../A2AServiceCollectionExtensions.cs | 10 + src/RockBot.A2A/FileAgentTrustStore.cs | 111 ++++++++ .../IdentityVerificationMiddleware.cs | 50 ++++ .../NameBasedAgentIdentityVerifier.cs | 27 ++ src/RockBot.Agent/A2A/InboundA2AToolSet.cs | 43 +++ src/RockBot.Agent/A2A/RockBotTaskHandler.cs | 261 ++++++++++++++++++ src/RockBot.Agent/Program.cs | 35 ++- .../IInboundNotificationQueue.cs | 17 ++ .../InboundNotification.cs | 23 ++ src/RockBot.Host/InboundNotificationQueue.cs | 27 ++ .../InboundNotificationService.cs | 123 +++++++++ .../ServiceCollectionExtensions.cs | 2 + src/RockBot.UserProxy.Blazor/Pages/Chat.razor | 9 +- .../Services/BlazorUserFrontend.cs | 4 + .../wwwroot/css/app.css | 3 +- .../IdentityVerificationMiddlewareTests.cs | 124 +++++++++ .../IdentityVerificationTests.cs | 61 ++++ tests/RockBot.A2A.Tests/TrustStoreTests.cs | 104 +++++++ .../AgentProfileExtensionsTests.cs | 8 + .../InboundNotificationQueueTests.cs | 65 +++++ .../ServiceCollectionExtensionsTests.cs | 8 + 27 files changed, 1239 insertions(+), 11 deletions(-) create mode 100644 src/RockBot.A2A.Abstractions/AgentTrustEntry.cs create mode 100644 src/RockBot.A2A.Abstractions/AgentTrustLevel.cs create mode 100644 src/RockBot.A2A.Abstractions/IAgentIdentityVerifier.cs create mode 100644 src/RockBot.A2A.Abstractions/IAgentTrustStore.cs create mode 100644 src/RockBot.A2A.Abstractions/VerifiedAgentIdentity.cs create mode 100644 src/RockBot.A2A/FileAgentTrustStore.cs create mode 100644 src/RockBot.A2A/IdentityVerificationMiddleware.cs create mode 100644 src/RockBot.A2A/NameBasedAgentIdentityVerifier.cs create mode 100644 src/RockBot.Agent/A2A/InboundA2AToolSet.cs create mode 100644 src/RockBot.Agent/A2A/RockBotTaskHandler.cs create mode 100644 src/RockBot.Host.Abstractions/IInboundNotificationQueue.cs create mode 100644 src/RockBot.Host.Abstractions/InboundNotification.cs create mode 100644 src/RockBot.Host/InboundNotificationQueue.cs create mode 100644 src/RockBot.Host/InboundNotificationService.cs create mode 100644 tests/RockBot.A2A.Tests/IdentityVerificationMiddlewareTests.cs create mode 100644 tests/RockBot.A2A.Tests/IdentityVerificationTests.cs create mode 100644 tests/RockBot.A2A.Tests/TrustStoreTests.cs create mode 100644 tests/RockBot.Host.Tests/InboundNotificationQueueTests.cs diff --git a/src/RockBot.A2A.Abstractions/AgentTrustEntry.cs b/src/RockBot.A2A.Abstractions/AgentTrustEntry.cs new file mode 100644 index 0000000..b23cba2 --- /dev/null +++ b/src/RockBot.A2A.Abstractions/AgentTrustEntry.cs @@ -0,0 +1,26 @@ +namespace RockBot.A2A; + +/// +/// Per-caller trust record tracking the trust level, approved skills, and +/// interaction history for an external agent identified by . +/// +public sealed record AgentTrustEntry +{ + /// Canonical unique identifier for the caller (from ). + public required string AgentId { get; init; } + + /// Current trust level for this caller. + public required AgentTrustLevel Level { get; init; } + + /// Skill IDs this caller is approved to invoke autonomously (Level 4). + public IReadOnlyList ApprovedSkills { get; init; } = []; + + /// When this caller was first seen. + public DateTimeOffset FirstSeen { get; init; } + + /// When the last interaction with this caller occurred. + public DateTimeOffset LastInteraction { get; init; } + + /// Total number of inbound tasks received from this caller. + public int InteractionCount { get; init; } +} diff --git a/src/RockBot.A2A.Abstractions/AgentTrustLevel.cs b/src/RockBot.A2A.Abstractions/AgentTrustLevel.cs new file mode 100644 index 0000000..1b76f86 --- /dev/null +++ b/src/RockBot.A2A.Abstractions/AgentTrustLevel.cs @@ -0,0 +1,20 @@ +namespace RockBot.A2A; + +/// +/// Trust level assigned to an external agent caller. Each caller progresses +/// through these levels independently based on user approval. +/// +public enum AgentTrustLevel +{ + /// Read-only access; summarize request and notify user. + Observe = 1, + + /// Same as Observe, but system observes user responses and proposes skill drafts. + Learn = 2, + + /// System has candidate skills and asks user to approve them. + Propose = 3, + + /// Approved skills execute autonomously; results reported to user post-hoc. + Act = 4 +} diff --git a/src/RockBot.A2A.Abstractions/IAgentIdentityVerifier.cs b/src/RockBot.A2A.Abstractions/IAgentIdentityVerifier.cs new file mode 100644 index 0000000..1cc4ff3 --- /dev/null +++ b/src/RockBot.A2A.Abstractions/IAgentIdentityVerifier.cs @@ -0,0 +1,18 @@ +using RockBot.Messaging; + +namespace RockBot.A2A; + +/// +/// Verifies the identity of an agent from an inbound message envelope. +/// Implementations may inspect headers (tokens, signatures), the Source field, +/// or any other envelope metadata to establish a verified identity. +/// Register a custom implementation via DI to replace the default name-based verifier. +/// +public interface IAgentIdentityVerifier +{ + /// + /// Verifies the sender identity from the envelope metadata. + /// Returns a on success, or throws if verification fails. + /// + Task VerifyAsync(MessageEnvelope envelope, CancellationToken ct); +} diff --git a/src/RockBot.A2A.Abstractions/IAgentTrustStore.cs b/src/RockBot.A2A.Abstractions/IAgentTrustStore.cs new file mode 100644 index 0000000..e34861c --- /dev/null +++ b/src/RockBot.A2A.Abstractions/IAgentTrustStore.cs @@ -0,0 +1,24 @@ +namespace RockBot.A2A; + +/// +/// Persistent store for per-caller trust entries. Implementations must be +/// thread-safe — concurrent A2A requests may read/write simultaneously. +/// +public interface IAgentTrustStore +{ + /// + /// Returns the trust entry for , creating a new + /// entry at if none exists. + /// + Task GetOrCreateAsync(string agentId, CancellationToken ct); + + /// + /// Persists an updated trust entry. The entry is matched by . + /// + Task UpdateAsync(AgentTrustEntry entry, CancellationToken ct); + + /// + /// Returns all known trust entries. + /// + Task> ListAsync(CancellationToken ct); +} diff --git a/src/RockBot.A2A.Abstractions/VerifiedAgentIdentity.cs b/src/RockBot.A2A.Abstractions/VerifiedAgentIdentity.cs new file mode 100644 index 0000000..444557a --- /dev/null +++ b/src/RockBot.A2A.Abstractions/VerifiedAgentIdentity.cs @@ -0,0 +1,40 @@ +namespace RockBot.A2A; + +/// +/// The result of identity verification for an inbound agent message. +/// is the stable key used for trust tracking. +/// +public sealed record VerifiedAgentIdentity +{ + /// + /// Key used to store/retrieve in + /// . + /// + public const string ContextKey = "verified-identity"; + + /// + /// Canonical unique identifier for the agent. Used as the key in trust stores. + /// For name-based verification this equals the Source string; for registry-backed + /// verification it would be a registry-issued identifier. + /// + public required string AgentId { get; init; } + + /// Human-readable display name for the agent. + public required string DisplayName { get; init; } + + /// + /// Who vouched for this identity (e.g. "self", a registry URL, an IdP issuer). + /// + public string? Issuer { get; init; } + + /// + /// Extensible claims extracted during verification (e.g. roles, scopes, OBO subject). + /// + public IReadOnlyDictionary? Claims { get; init; } + + /// + /// True when identity is based solely on the sender's self-asserted Source string + /// with no cryptographic or registry-backed verification. + /// + public bool IsSelfAsserted { get; init; } +} diff --git a/src/RockBot.A2A/A2AOptions.cs b/src/RockBot.A2A/A2AOptions.cs index ee7edb6..a468aca 100644 --- a/src/RockBot.A2A/A2AOptions.cs +++ b/src/RockBot.A2A/A2AOptions.cs @@ -29,6 +29,13 @@ public sealed class A2AOptions /// public TimeSpan DirectoryEntryTtl { get; set; } = TimeSpan.FromHours(24); + /// + /// Path to the file where per-caller trust entries are persisted. + /// Relative paths are resolved from . + /// Set to null or empty to disable persistence. + /// + public string? TrustStorePath { get; set; } = "agent-trust.json"; + /// /// Statically-configured agents that are always included in list_known_agents /// regardless of whether they have announced themselves on the discovery bus. diff --git a/src/RockBot.A2A/A2AServiceCollectionExtensions.cs b/src/RockBot.A2A/A2AServiceCollectionExtensions.cs index 86fb15d..a753140 100644 --- a/src/RockBot.A2A/A2AServiceCollectionExtensions.cs +++ b/src/RockBot.A2A/A2AServiceCollectionExtensions.cs @@ -32,6 +32,16 @@ public static AgentHostBuilder AddA2A( sp => sp.GetRequiredService()); } + // Identity verification — default to name-based; users can override via DI + builder.Services.TryAddSingleton(); + + // Trust store — default to file-backed; users can override via DI + builder.Services.TryAddSingleton(sp => + new FileAgentTrustStore(options.TrustStorePath)); + + // Identity verification middleware — verifies A2A inbound messages + builder.UseMiddleware(); + // Summarizer — uses ILlmClient if available, otherwise falls back gracefully builder.Services.TryAddSingleton(); diff --git a/src/RockBot.A2A/FileAgentTrustStore.cs b/src/RockBot.A2A/FileAgentTrustStore.cs new file mode 100644 index 0000000..e16c5c4 --- /dev/null +++ b/src/RockBot.A2A/FileAgentTrustStore.cs @@ -0,0 +1,111 @@ +using System.Collections.Concurrent; +using System.Text.Json; + +namespace RockBot.A2A; + +/// +/// File-backed trust store that persists records as JSON. +/// Thread-safe via with debounced writes. +/// +internal sealed class FileAgentTrustStore : IAgentTrustStore +{ + private static readonly JsonSerializerOptions JsonOptions = new() + { + PropertyNamingPolicy = JsonNamingPolicy.CamelCase, + PropertyNameCaseInsensitive = true, + WriteIndented = true + }; + + private readonly ConcurrentDictionary _entries = + new(StringComparer.OrdinalIgnoreCase); + + private readonly string? _filePath; + private readonly SemaphoreSlim _writeLock = new(1, 1); + private volatile bool _loaded; + + public FileAgentTrustStore(string? filePath) + { + _filePath = string.IsNullOrWhiteSpace(filePath) ? null : filePath; + } + + public async Task GetOrCreateAsync(string agentId, CancellationToken ct) + { + await EnsureLoadedAsync(ct); + + if (_entries.TryGetValue(agentId, out var existing)) + return existing; + + var entry = new AgentTrustEntry + { + AgentId = agentId, + Level = AgentTrustLevel.Observe, + FirstSeen = DateTimeOffset.UtcNow, + LastInteraction = DateTimeOffset.UtcNow, + InteractionCount = 0 + }; + entry = _entries.GetOrAdd(agentId, entry); + await PersistAsync(ct); + return entry; + } + + public async Task UpdateAsync(AgentTrustEntry entry, CancellationToken ct) + { + await EnsureLoadedAsync(ct); + _entries[entry.AgentId] = entry; + await PersistAsync(ct); + } + + public async Task> ListAsync(CancellationToken ct) + { + await EnsureLoadedAsync(ct); + return _entries.Values.ToList(); + } + + private async Task EnsureLoadedAsync(CancellationToken ct) + { + if (_loaded) return; + + await _writeLock.WaitAsync(ct); + try + { + if (_loaded) return; + + if (_filePath is not null && File.Exists(_filePath)) + { + var json = await File.ReadAllTextAsync(_filePath, ct); + var entries = JsonSerializer.Deserialize>(json, JsonOptions); + if (entries is not null) + { + foreach (var entry in entries) + _entries.TryAdd(entry.AgentId, entry); + } + } + + _loaded = true; + } + finally + { + _writeLock.Release(); + } + } + + private async Task PersistAsync(CancellationToken ct) + { + if (_filePath is null) return; + + await _writeLock.WaitAsync(ct); + try + { + var entries = _entries.Values.ToList(); + var json = JsonSerializer.Serialize(entries, JsonOptions); + var dir = Path.GetDirectoryName(_filePath); + if (!string.IsNullOrEmpty(dir)) + Directory.CreateDirectory(dir); + await File.WriteAllTextAsync(_filePath, json, ct); + } + finally + { + _writeLock.Release(); + } + } +} diff --git a/src/RockBot.A2A/IdentityVerificationMiddleware.cs b/src/RockBot.A2A/IdentityVerificationMiddleware.cs new file mode 100644 index 0000000..60bf78b --- /dev/null +++ b/src/RockBot.A2A/IdentityVerificationMiddleware.cs @@ -0,0 +1,50 @@ +using Microsoft.Extensions.Logging; +using RockBot.Host; +using RockBot.Messaging; + +namespace RockBot.A2A; + +/// +/// Middleware that verifies the identity of inbound A2A messages using +/// . Stores the verified identity in +/// under the key +/// for downstream handlers. +/// Only runs on A2A task-related messages; passes all other messages through unchanged. +/// +internal sealed class IdentityVerificationMiddleware( + IAgentIdentityVerifier verifier, + ILogger logger) : IMiddleware +{ + public async Task InvokeAsync(MessageHandlerContext context, MessageHandlerDelegate next) + { + if (!IsA2AMessage(context.Envelope)) + { + await next(context); + return; + } + + try + { + var identity = await verifier.VerifyAsync(context.Envelope, context.CancellationToken); + context.Items[VerifiedAgentIdentity.ContextKey] = identity; + logger.LogDebug("Verified inbound A2A identity: {AgentId} (self-asserted: {SelfAsserted})", + identity.AgentId, identity.IsSelfAsserted); + } + catch (Exception ex) + { + logger.LogWarning(ex, "Identity verification failed for message {MessageId} from source '{Source}'", + context.Envelope.MessageId, context.Envelope.Source); + context.Result = MessageResult.DeadLetter; + return; + } + + await next(context); + } + + private static bool IsA2AMessage(MessageEnvelope envelope) + { + var type = envelope.MessageType; + return type.Contains(nameof(AgentTaskRequest), StringComparison.Ordinal) || + type.Contains(nameof(AgentTaskCancelRequest), StringComparison.Ordinal); + } +} diff --git a/src/RockBot.A2A/NameBasedAgentIdentityVerifier.cs b/src/RockBot.A2A/NameBasedAgentIdentityVerifier.cs new file mode 100644 index 0000000..c4c1818 --- /dev/null +++ b/src/RockBot.A2A/NameBasedAgentIdentityVerifier.cs @@ -0,0 +1,27 @@ +using RockBot.Messaging; + +namespace RockBot.A2A; + +/// +/// Prototype identity verifier that trusts the envelope's Source field at face value. +/// Returns = true to indicate +/// no cryptographic or registry-backed verification was performed. +/// Replace via DI with a custom for production use. +/// +internal sealed class NameBasedAgentIdentityVerifier : IAgentIdentityVerifier +{ + public Task VerifyAsync(MessageEnvelope envelope, CancellationToken ct) + { + if (string.IsNullOrWhiteSpace(envelope.Source)) + throw new InvalidOperationException("Cannot verify identity: envelope Source is empty."); + + var identity = new VerifiedAgentIdentity + { + AgentId = envelope.Source, + DisplayName = envelope.Source, + Issuer = "self", + IsSelfAsserted = true + }; + return Task.FromResult(identity); + } +} diff --git a/src/RockBot.Agent/A2A/InboundA2AToolSet.cs b/src/RockBot.Agent/A2A/InboundA2AToolSet.cs new file mode 100644 index 0000000..8121f2e --- /dev/null +++ b/src/RockBot.Agent/A2A/InboundA2AToolSet.cs @@ -0,0 +1,43 @@ +using Microsoft.Extensions.AI; +using Microsoft.Extensions.Logging; +using RockBot.Host; +using RockBot.Memory; + +namespace RockBot.Agent.A2A; + +/// +/// Assembles a restricted tool set for inbound A2A task processing at Level 1 (Observe). +/// Only read-oriented tools are exposed: working memory read/list/search, long-term memory +/// search, and a scoped working memory write limited to the a2a-inbox namespace. +/// +internal static class InboundA2AToolSet +{ + /// + /// Builds the restricted tool list for an inbound A2A task. + /// + /// Global working memory instance. + /// Long-term memory tools (only SearchMemory is included). + /// The A2A task ID — used as the working memory namespace. + /// Logger for tool invocations. + public static IList Build( + IWorkingMemory workingMemory, + MemoryTools memoryTools, + string taskId, + ILogger logger) + { + // Working memory scoped to a2a-inbox/{taskId} — writes are contained to this namespace + var wmTools = new WorkingMemoryTools(workingMemory, $"a2a-inbox/{taskId}", logger); + + // From working memory tools: include all (read + write scoped to inbox namespace) + var tools = new List(wmTools.Tools); + + // From long-term memory: include only SearchMemory (read-only) + var searchMemory = memoryTools.Tools + .OfType() + .FirstOrDefault(f => f.Name == "SearchMemory"); + if (searchMemory is not null) + tools.Add(searchMemory); + + return tools; + } +} diff --git a/src/RockBot.Agent/A2A/RockBotTaskHandler.cs b/src/RockBot.Agent/A2A/RockBotTaskHandler.cs new file mode 100644 index 0000000..d880531 --- /dev/null +++ b/src/RockBot.Agent/A2A/RockBotTaskHandler.cs @@ -0,0 +1,261 @@ +using Microsoft.Extensions.AI; +using Microsoft.Extensions.Logging; +using RockBot.A2A; +using RockBot.Host; +using RockBot.Memory; + +namespace RockBot.Agent.A2A; + +/// +/// Handles inbound A2A task requests directed at RockBot. Dispatches by trust level: +/// +/// Level 1 (Observe): read-only LLM pass, writes summary to working memory, notifies user +/// Level 4 (Act): executes approved skills autonomously (notify-user, query-availability) +/// +/// +internal sealed class RockBotTaskHandler( + AgentLoopRunner agentLoopRunner, + IWorkingMemory workingMemory, + MemoryTools memoryTools, + IAgentTrustStore trustStore, + IInboundNotificationQueue notificationQueue, + IUserActivityMonitor userActivityMonitor, + ISessionTracker sessionTracker, + ILogger logger) : IAgentTaskHandler +{ + private const string ObserveSystemPrompt = + """ + An external agent has sent you a task request. You are evaluating it on behalf of your user. + + Your job: + 1. Analyze the request — what is the caller asking for? + 2. Check your long-term memories (SearchMemory) for any relevant context about the caller or topic. + 3. Write a concise summary and suggested action to working memory so the user can review it. + + You MUST save the following to working memory: + - Key "summary": A brief summary of what the caller wants and your recommended action for the user. + + Do NOT take any actions, make commitments, or respond on behalf of the user. + Do NOT write to long-term memory. + You are strictly in observation mode. + """; + + /// Default session used to check idle state (matches Blazor UI hardcoded session). + private const string PrimarySessionId = "blazor-session"; + + public async Task HandleTaskAsync(AgentTaskRequest request, AgentTaskContext context) + { + var ct = context.MessageContext.CancellationToken; + + // Extract verified identity (placed by IdentityVerificationMiddleware) + var identity = context.MessageContext.Items.TryGetValue( + VerifiedAgentIdentity.ContextKey, out var obj) && obj is VerifiedAgentIdentity vid + ? vid + : new VerifiedAgentIdentity + { + AgentId = context.MessageContext.Envelope.Source ?? "unknown", + DisplayName = context.MessageContext.Envelope.Source ?? "unknown", + Issuer = "fallback", + IsSelfAsserted = true + }; + + logger.LogInformation( + "Inbound A2A task {TaskId} from {CallerId} (skill={Skill}, self-asserted={SelfAsserted})", + request.TaskId, identity.AgentId, request.Skill, identity.IsSelfAsserted); + + // Update trust tracking + var trust = await trustStore.GetOrCreateAsync(identity.AgentId, ct); + trust = trust with + { + LastInteraction = DateTimeOffset.UtcNow, + InteractionCount = trust.InteractionCount + 1 + }; + await trustStore.UpdateAsync(trust, ct); + + // Dispatch built-in skills for Level 4 callers with approved skills + if (trust.Level >= AgentTrustLevel.Act && + trust.ApprovedSkills.Contains(request.Skill, StringComparer.OrdinalIgnoreCase)) + { + return request.Skill.ToLowerInvariant() switch + { + "notify-user" => await HandleNotifyUserAsync(request, identity, ct), + "query-availability" => HandleQueryAvailability(request), + _ => await HandleObserveAsync(request, identity, context, ct) + }; + } + + // Default: Level 1 (Observe) behavior for all callers + return await HandleObserveAsync(request, identity, context, ct); + } + + private async Task HandleObserveAsync( + AgentTaskRequest request, + VerifiedAgentIdentity caller, + AgentTaskContext context, + CancellationToken ct) + { + // Publish Working status + await context.PublishStatus(new AgentTaskStatusUpdate + { + TaskId = request.TaskId, + ContextId = request.ContextId, + State = AgentTaskState.Working, + Message = new AgentMessage + { + Role = "agent", + Parts = [new AgentMessagePart { Kind = "text", Text = "Reviewing your request..." }] + } + }, ct); + + var question = ExtractText(request); + var sessionId = $"a2a-inbound/{request.TaskId}"; + + // Build restricted tool set + var tools = InboundA2AToolSet.Build(workingMemory, memoryTools, request.TaskId, logger); + + var chatOptions = new ChatOptions { Tools = [.. tools] }; + var chatMessages = new List + { + new(ChatRole.System, ObserveSystemPrompt), + new(ChatRole.User, + $"Inbound request from agent '{caller.DisplayName}' (ID: {caller.AgentId}, " + + $"self-asserted: {caller.IsSelfAsserted}).\n" + + $"Skill requested: {request.Skill}\n\n" + + $"Message:\n{question}") + }; + + // Run read-only LLM pass + var summary = await agentLoopRunner.RunAsync( + chatMessages, chatOptions, sessionId, + tier: ModelTier.Low, + enableFollowUp: false, + enableCompletionEval: false, + cancellationToken: ct); + + // Ensure caller info is in working memory + await workingMemory.SetAsync( + $"a2a-inbox/{request.TaskId}/caller", + $"Agent: {caller.DisplayName} (ID: {caller.AgentId}, issuer: {caller.Issuer ?? "unknown"}, " + + $"self-asserted: {caller.IsSelfAsserted})", + TimeSpan.FromHours(24), "a2a", ["inbound", "caller"]); + + await workingMemory.SetAsync( + $"a2a-inbox/{request.TaskId}/request", + $"Skill: {request.Skill}\n\n{question}", + TimeSpan.FromHours(24), "a2a", ["inbound", "request"]); + + await workingMemory.SetAsync( + $"a2a-inbox/{request.TaskId}/status", + "pending-review", + TimeSpan.FromHours(24), "a2a", ["inbound", "status"]); + + logger.LogInformation( + "A2A task {TaskId} from {CallerId} processed at Observe level, summary length={Len}", + request.TaskId, caller.AgentId, summary.Length); + + // Queue notification for the user (batched delivery when idle) + await notificationQueue.EnqueueAsync(new InboundNotification + { + TaskId = request.TaskId, + CallerName = caller.DisplayName, + Summary = summary, + ReceivedAt = DateTimeOffset.UtcNow, + SkillId = request.Skill + }, ct); + + return new AgentTaskResult + { + TaskId = request.TaskId, + ContextId = request.ContextId, + State = AgentTaskState.Completed, + Message = new AgentMessage + { + Role = "agent", + Parts = [new AgentMessagePart + { + Kind = "text", + Text = "Your request has been received and the user will be notified. " + + "A summary and suggested action have been prepared for their review." + }] + } + }; + } + + private async Task HandleNotifyUserAsync( + AgentTaskRequest request, + VerifiedAgentIdentity caller, + CancellationToken ct) + { + var message = ExtractText(request); + + // Write notification to A2A inbox + await workingMemory.SetAsync( + $"a2a-inbox/{request.TaskId}/summary", + $"Notification from {caller.DisplayName}: {message}", + TimeSpan.FromHours(24), "a2a", ["inbound", "notification"]); + + await workingMemory.SetAsync( + $"a2a-inbox/{request.TaskId}/status", + "notification-delivered", + TimeSpan.FromHours(24), "a2a", ["inbound", "status"]); + + logger.LogInformation("A2A notify-user from {CallerId}: {Preview}", + caller.AgentId, message.Length > 100 ? message[..100] + "..." : message); + + await notificationQueue.EnqueueAsync(new InboundNotification + { + TaskId = request.TaskId, + CallerName = caller.DisplayName, + Summary = $"Notification: {message}", + ReceivedAt = DateTimeOffset.UtcNow, + SkillId = "notify-user" + }, ct); + + return new AgentTaskResult + { + TaskId = request.TaskId, + ContextId = request.ContextId, + State = AgentTaskState.Completed, + Message = new AgentMessage + { + Role = "agent", + Parts = [new AgentMessagePart { Kind = "text", Text = "User has been notified." }] + } + }; + } + + private AgentTaskResult HandleQueryAvailability(AgentTaskRequest request) + { + var hasActiveLoop = sessionTracker.HasActiveUserLoop(PrimarySessionId); + var isRecentlyActive = userActivityMonitor.IsUserActive(TimeSpan.FromMinutes(5)); + + var status = (hasActiveLoop, isRecentlyActive) switch + { + (true, _) => "busy", + (false, true) => "available, may be delayed", + (false, false) => "away" + }; + + logger.LogInformation("A2A query-availability: {Status} (activeLoop={Active}, recentActivity={Recent})", + status, hasActiveLoop, isRecentlyActive); + + return new AgentTaskResult + { + TaskId = request.TaskId, + ContextId = request.ContextId, + State = AgentTaskState.Completed, + Message = new AgentMessage + { + Role = "agent", + Parts = [new AgentMessagePart { Kind = "text", Text = status }] + } + }; + } + + private static string ExtractText(AgentTaskRequest request) => + request.Message.Parts + .Where(p => p.Kind == "text") + .Select(p => p.Text) + .FirstOrDefault(t => !string.IsNullOrWhiteSpace(t)) + ?? "(no message provided)"; +} diff --git a/src/RockBot.Agent/Program.cs b/src/RockBot.Agent/Program.cs index f929b48..28e3858 100644 --- a/src/RockBot.Agent/Program.cs +++ b/src/RockBot.Agent/Program.cs @@ -194,17 +194,33 @@ IChatClient BuildClient(LlmTierConfig config) agent.AddHeartbeatBootstrap(opts => builder.Configuration.GetSection("HeartbeatPatrol").Bind(opts)); agent.AddSubagents(); - agent.AddA2ACaller(opts => + var a2aBasePath = builder.Configuration["AgentProfile:BasePath"] + ?? builder.Configuration["AgentProfile__BasePath"] + ?? AppContext.BaseDirectory; + agent.AddA2A(opts => { - var basePath = builder.Configuration["AgentProfile:BasePath"] - ?? builder.Configuration["AgentProfile__BasePath"] - ?? AppContext.BaseDirectory; - opts.DirectoryPersistencePath = Path.Combine(basePath, "known-agents.json"); + opts.Card = new AgentCard + { + AgentName = "RockBot", + Description = "Personal AI agent — accepts notifications and availability queries", + Version = "1.0", + Skills = + [ + new AgentSkill { Id = "notify-user", Name = "Notify User", + Description = "Send a notification to the user" }, + new AgentSkill { Id = "query-availability", Name = "Query Availability", + Description = "Check if the user is available (free/busy)" } + ] + }; + opts.TrustStorePath = Path.Combine(a2aBasePath, "agent-trust.json"); + + // Shared options — AddA2A registers A2AOptions first (AddSingleton); + // AddA2ACaller uses TryAddSingleton so it reuses this instance. + opts.DirectoryPersistencePath = Path.Combine(a2aBasePath, "known-agents.json"); // Well-known agents loaded from a JSON file on the PVC so the list can be - // updated without rebuilding the image. File path mirrors the other agent - // data files (soul.md, directives.md, etc.) under the agent base path. - var wellKnownPath = Path.Combine(basePath, "well-known-agents.json"); + // updated without rebuilding the image. + var wellKnownPath = Path.Combine(a2aBasePath, "well-known-agents.json"); if (File.Exists(wellKnownPath)) { try @@ -217,11 +233,12 @@ IChatClient BuildClient(LlmTierConfig config) } catch (Exception ex) { - // Non-fatal — log at startup but don't prevent the agent from starting Console.Error.WriteLine($"[warn] Could not load well-known agents from {wellKnownPath}: {ex.Message}"); } } }); + agent.Services.AddScoped(); + agent.AddA2ACaller(); agent.AddServiceSearch(); agent.HandleMessage(); agent.HandleMessage(); diff --git a/src/RockBot.Host.Abstractions/IInboundNotificationQueue.cs b/src/RockBot.Host.Abstractions/IInboundNotificationQueue.cs new file mode 100644 index 0000000..9595b82 --- /dev/null +++ b/src/RockBot.Host.Abstractions/IInboundNotificationQueue.cs @@ -0,0 +1,17 @@ +namespace RockBot.Host; + +/// +/// Queue for inbound A2A notifications that accumulate while the user is busy. +/// Notifications are drained and presented as a batch when the user becomes idle. +/// +public interface IInboundNotificationQueue +{ + /// Adds a notification to the queue. + Task EnqueueAsync(InboundNotification notification, CancellationToken ct); + + /// Removes and returns all queued notifications. + Task> DrainAsync(CancellationToken ct); + + /// Number of notifications waiting to be presented. + int PendingCount { get; } +} diff --git a/src/RockBot.Host.Abstractions/InboundNotification.cs b/src/RockBot.Host.Abstractions/InboundNotification.cs new file mode 100644 index 0000000..f65614a --- /dev/null +++ b/src/RockBot.Host.Abstractions/InboundNotification.cs @@ -0,0 +1,23 @@ +namespace RockBot.Host; + +/// +/// Represents a notification from an inbound A2A task that should be +/// presented to the user when they are idle. +/// +public sealed record InboundNotification +{ + /// A2A task ID. + public required string TaskId { get; init; } + + /// Display name of the calling agent. + public required string CallerName { get; init; } + + /// LLM-generated or handler-generated summary of the request. + public required string Summary { get; init; } + + /// When the notification was created. + public required DateTimeOffset ReceivedAt { get; init; } + + /// The A2A skill that was invoked, if any. + public string? SkillId { get; init; } +} diff --git a/src/RockBot.Host/InboundNotificationQueue.cs b/src/RockBot.Host/InboundNotificationQueue.cs new file mode 100644 index 0000000..69795e5 --- /dev/null +++ b/src/RockBot.Host/InboundNotificationQueue.cs @@ -0,0 +1,27 @@ +using System.Collections.Concurrent; + +namespace RockBot.Host; + +/// +/// Thread-safe in-memory queue for inbound A2A notifications. +/// +internal sealed class InboundNotificationQueue : IInboundNotificationQueue +{ + private readonly ConcurrentQueue _queue = new(); + + public int PendingCount => _queue.Count; + + public Task EnqueueAsync(InboundNotification notification, CancellationToken ct) + { + _queue.Enqueue(notification); + return Task.CompletedTask; + } + + public Task> DrainAsync(CancellationToken ct) + { + var items = new List(); + while (_queue.TryDequeue(out var item)) + items.Add(item); + return Task.FromResult>(items); + } +} diff --git a/src/RockBot.Host/InboundNotificationService.cs b/src/RockBot.Host/InboundNotificationService.cs new file mode 100644 index 0000000..d540640 --- /dev/null +++ b/src/RockBot.Host/InboundNotificationService.cs @@ -0,0 +1,123 @@ +using System.Text; +using Microsoft.Extensions.Hosting; +using Microsoft.Extensions.Logging; +using RockBot.Messaging; +using RockBot.UserProxy; + +namespace RockBot.Host; + +/// +/// Background service that monitors the notification queue and presents batched +/// A2A notifications to the user when they become idle (~2 minutes of inactivity). +/// +internal sealed class InboundNotificationService : IHostedService, IDisposable +{ + /// How long the user must be idle before flushing notifications. + private static readonly TimeSpan IdleThreshold = TimeSpan.FromMinutes(2); + + /// How often we check for idle state. + private static readonly TimeSpan PollInterval = TimeSpan.FromSeconds(30); + + /// Session ID used for A2A inbound notifications in the UI. + internal const string A2AInboundSessionId = "a2a-inbound"; + + /// Agent name used for A2A inbound notifications in the UI. + internal const string A2AInboundAgentName = "A2A-Inbox"; + + private readonly IInboundNotificationQueue _queue; + private readonly IUserActivityMonitor _userActivityMonitor; + private readonly ISessionTracker _sessionTracker; + private readonly IMessagePublisher _publisher; + private readonly AgentIdentity _agent; + private readonly ILogger _logger; + private Timer? _timer; + + public InboundNotificationService( + IInboundNotificationQueue queue, + IUserActivityMonitor userActivityMonitor, + ISessionTracker sessionTracker, + IMessagePublisher publisher, + AgentIdentity agent, + ILogger logger) + { + _queue = queue; + _userActivityMonitor = userActivityMonitor; + _sessionTracker = sessionTracker; + _publisher = publisher; + _agent = agent; + _logger = logger; + } + + public Task StartAsync(CancellationToken cancellationToken) + { + _timer = new Timer(CheckAndFlush, null, PollInterval, PollInterval); + return Task.CompletedTask; + } + + public Task StopAsync(CancellationToken cancellationToken) + { + _timer?.Change(Timeout.Infinite, 0); + return Task.CompletedTask; + } + + public void Dispose() => _timer?.Dispose(); + + private async void CheckAndFlush(object? state) + { + try + { + if (_queue.PendingCount == 0) + return; + + // User is idle when: no active session loop AND no recent activity + var hasActiveLoop = _sessionTracker.HasActiveUserLoop("blazor-session"); + var isRecentlyActive = _userActivityMonitor.IsUserActive(IdleThreshold); + + if (hasActiveLoop || isRecentlyActive) + return; + + var notifications = await _queue.DrainAsync(CancellationToken.None); + if (notifications.Count == 0) + return; + + _logger.LogInformation( + "Flushing {Count} A2A notification(s) to user after idle period", + notifications.Count); + + var content = FormatNotifications(notifications); + + var reply = new AgentReply + { + Content = content, + SessionId = A2AInboundSessionId, + AgentName = A2AInboundAgentName, + IsFinal = true + }; + + var envelope = reply.ToEnvelope(source: _agent.Name); + await _publisher.PublishAsync(UserProxyTopics.UserResponse, envelope, CancellationToken.None); + } + catch (Exception ex) + { + _logger.LogError(ex, "Error flushing A2A notifications"); + } + } + + private static string FormatNotifications(IReadOnlyList notifications) + { + if (notifications.Count == 1) + { + var n = notifications[0]; + return $"**Agent \"{n.CallerName}\" reached out** (skill: {n.SkillId ?? "general"})\n\n{n.Summary}"; + } + + var sb = new StringBuilder(); + sb.AppendLine($"**While you were away, {notifications.Count} agents reached out:**\n"); + for (var i = 0; i < notifications.Count; i++) + { + var n = notifications[i]; + sb.AppendLine($"{i + 1}. **{n.CallerName}** (skill: {n.SkillId ?? "general"}) — {n.Summary}"); + } + return sb.ToString().TrimEnd(); + } +} diff --git a/src/RockBot.Host/ServiceCollectionExtensions.cs b/src/RockBot.Host/ServiceCollectionExtensions.cs index 30ce547..a638ffc 100644 --- a/src/RockBot.Host/ServiceCollectionExtensions.cs +++ b/src/RockBot.Host/ServiceCollectionExtensions.cs @@ -44,6 +44,8 @@ public static IServiceCollection AddRockBotHost( services.Configure(_ => { }); services.Configure(_ => { }); services.AddSingleton(); + services.AddSingleton(); + services.AddSingleton(); services.AddSingleton(); return services; diff --git a/src/RockBot.UserProxy.Blazor/Pages/Chat.razor b/src/RockBot.UserProxy.Blazor/Pages/Chat.razor index 94e24e7..f3d7ab0 100644 --- a/src/RockBot.UserProxy.Blazor/Pages/Chat.razor +++ b/src/RockBot.UserProxy.Blazor/Pages/Chat.razor @@ -47,6 +47,13 @@ ⏱️ } + @foreach (var a2a in statusIndicators.Where(i => i.Category == MessageCategory.A2AActivity)) + { + + 🔗 + + } @@ -540,7 +547,7 @@ private static string GetCategoryLabel(MessageCategory category) => category switch { MessageCategory.SubagentActivity => "Subagent", - MessageCategory.A2AActivity => "A2A", + MessageCategory.A2AActivity => "Inbound A2A", MessageCategory.ScheduledSystem => "Background Task", _ => "Agent" }; diff --git a/src/RockBot.UserProxy.Blazor/Services/BlazorUserFrontend.cs b/src/RockBot.UserProxy.Blazor/Services/BlazorUserFrontend.cs index cc19571..88095db 100644 --- a/src/RockBot.UserProxy.Blazor/Services/BlazorUserFrontend.cs +++ b/src/RockBot.UserProxy.Blazor/Services/BlazorUserFrontend.cs @@ -55,6 +55,10 @@ private MessageCategory CategorizeReply(AgentReply reply) if (reply.AgentName?.StartsWith("subagent-", StringComparison.OrdinalIgnoreCase) == true) return MessageCategory.SubagentActivity; + // Inbound A2A notifications use a dedicated session ID regardless of IsFinal + if (reply.SessionId?.StartsWith("a2a-inbound", StringComparison.OrdinalIgnoreCase) == true) + return MessageCategory.A2AActivity; + if (reply.IsFinal) return MessageCategory.PrimaryFinal; diff --git a/src/RockBot.UserProxy.Blazor/wwwroot/css/app.css b/src/RockBot.UserProxy.Blazor/wwwroot/css/app.css index ff807cb..60d3928 100644 --- a/src/RockBot.UserProxy.Blazor/wwwroot/css/app.css +++ b/src/RockBot.UserProxy.Blazor/wwwroot/css/app.css @@ -193,7 +193,8 @@ body { } .status-indicator.subagent-indicator, -.status-indicator.task-indicator { +.status-indicator.task-indicator, +.status-indicator.a2a-indicator { animation: status-pulse 2s ease-in-out infinite; } diff --git a/tests/RockBot.A2A.Tests/IdentityVerificationMiddlewareTests.cs b/tests/RockBot.A2A.Tests/IdentityVerificationMiddlewareTests.cs new file mode 100644 index 0000000..ac3fae0 --- /dev/null +++ b/tests/RockBot.A2A.Tests/IdentityVerificationMiddlewareTests.cs @@ -0,0 +1,124 @@ +using Microsoft.Extensions.Logging.Abstractions; +using RockBot.Host; +using RockBot.Messaging; + +namespace RockBot.A2A.Tests; + +[TestClass] +public class IdentityVerificationMiddlewareTests +{ + [TestMethod] + public async Task A2AMessage_StoresVerifiedIdentity() + { + var verifier = new NameBasedAgentIdentityVerifier(); + var middleware = new IdentityVerificationMiddleware( + verifier, NullLogger.Instance); + + var envelope = TestEnvelopeHelper.CreateEnvelope( + new AgentTaskRequest + { + TaskId = "t1", + Skill = "test", + Message = new AgentMessage { Role = "user", Parts = [] } + }, + source: "CallerAgent"); + + var context = new MessageHandlerContext + { + Envelope = envelope, + Agent = new AgentIdentity("RockBot"), + Services = null!, + CancellationToken = CancellationToken.None + }; + + var nextCalled = false; + await middleware.InvokeAsync(context, _ => + { + nextCalled = true; + return Task.CompletedTask; + }); + + Assert.IsTrue(nextCalled); + Assert.IsTrue(context.Items.ContainsKey(VerifiedAgentIdentity.ContextKey)); + var identity = (VerifiedAgentIdentity)context.Items[VerifiedAgentIdentity.ContextKey]; + Assert.AreEqual("CallerAgent", identity.AgentId); + Assert.IsTrue(identity.IsSelfAsserted); + } + + [TestMethod] + public async Task NonA2AMessage_PassesThroughWithoutVerification() + { + var verifier = new NameBasedAgentIdentityVerifier(); + var middleware = new IdentityVerificationMiddleware( + verifier, NullLogger.Instance); + + // UserMessage is not an A2A message type + var envelope = new MessageEnvelope + { + MessageId = "m1", + MessageType = "RockBot.UserProxy.UserMessage", + Body = new byte[] { }, + Source = "user", + Timestamp = DateTimeOffset.UtcNow + }; + + var context = new MessageHandlerContext + { + Envelope = envelope, + Agent = new AgentIdentity("RockBot"), + Services = null!, + CancellationToken = CancellationToken.None + }; + + var nextCalled = false; + await middleware.InvokeAsync(context, _ => + { + nextCalled = true; + return Task.CompletedTask; + }); + + Assert.IsTrue(nextCalled); + Assert.IsFalse(context.Items.ContainsKey(VerifiedAgentIdentity.ContextKey)); + } + + [TestMethod] + public async Task VerificationFailure_DeadLettersMessage() + { + var verifier = new FailingVerifier(); + var middleware = new IdentityVerificationMiddleware( + verifier, NullLogger.Instance); + + var envelope = TestEnvelopeHelper.CreateEnvelope( + new AgentTaskRequest + { + TaskId = "t1", + Skill = "test", + Message = new AgentMessage { Role = "user", Parts = [] } + }, + source: "BadAgent"); + + var context = new MessageHandlerContext + { + Envelope = envelope, + Agent = new AgentIdentity("RockBot"), + Services = null!, + CancellationToken = CancellationToken.None + }; + + var nextCalled = false; + await middleware.InvokeAsync(context, _ => + { + nextCalled = true; + return Task.CompletedTask; + }); + + Assert.IsFalse(nextCalled, "Next should not be called when verification fails"); + Assert.AreEqual(MessageResult.DeadLetter, context.Result); + } + + private sealed class FailingVerifier : IAgentIdentityVerifier + { + public Task VerifyAsync(MessageEnvelope envelope, CancellationToken ct) + => throw new InvalidOperationException("Verification failed"); + } +} diff --git a/tests/RockBot.A2A.Tests/IdentityVerificationTests.cs b/tests/RockBot.A2A.Tests/IdentityVerificationTests.cs new file mode 100644 index 0000000..d85d0ec --- /dev/null +++ b/tests/RockBot.A2A.Tests/IdentityVerificationTests.cs @@ -0,0 +1,61 @@ +using RockBot.Messaging; + +namespace RockBot.A2A.Tests; + +[TestClass] +public class IdentityVerificationTests +{ + [TestMethod] + public async Task NameBasedVerifier_ReturnsIdentityFromSource() + { + var verifier = new NameBasedAgentIdentityVerifier(); + var envelope = TestEnvelopeHelper.CreateEnvelope( + new AgentTaskRequest + { + TaskId = "t1", + Skill = "test", + Message = new AgentMessage { Role = "user", Parts = [] } + }, + source: "TestAgent"); + + var identity = await verifier.VerifyAsync(envelope, CancellationToken.None); + + Assert.AreEqual("TestAgent", identity.AgentId); + Assert.AreEqual("TestAgent", identity.DisplayName); + Assert.AreEqual("self", identity.Issuer); + Assert.IsTrue(identity.IsSelfAsserted); + } + + [TestMethod] + public async Task NameBasedVerifier_ThrowsWhenSourceEmpty() + { + var verifier = new NameBasedAgentIdentityVerifier(); + var envelope = TestEnvelopeHelper.CreateEnvelope( + new AgentTaskRequest + { + TaskId = "t1", + Skill = "test", + Message = new AgentMessage { Role = "user", Parts = [] } + }, + source: ""); + + // Source is required so we need to construct an envelope with empty source manually + var emptySourceEnvelope = new MessageEnvelope + { + MessageId = "test", + MessageType = typeof(AgentTaskRequest).FullName!, + Body = envelope.Body, + Source = "", + Timestamp = DateTimeOffset.UtcNow + }; + + await Assert.ThrowsExceptionAsync( + () => verifier.VerifyAsync(emptySourceEnvelope, CancellationToken.None)); + } + + [TestMethod] + public void VerifiedAgentIdentity_ContextKey_IsCorrect() + { + Assert.AreEqual("verified-identity", VerifiedAgentIdentity.ContextKey); + } +} diff --git a/tests/RockBot.A2A.Tests/TrustStoreTests.cs b/tests/RockBot.A2A.Tests/TrustStoreTests.cs new file mode 100644 index 0000000..47ea606 --- /dev/null +++ b/tests/RockBot.A2A.Tests/TrustStoreTests.cs @@ -0,0 +1,104 @@ +namespace RockBot.A2A.Tests; + +[TestClass] +public class TrustStoreTests +{ + private string _tempFile = null!; + + [TestInitialize] + public void Setup() + { + _tempFile = Path.Combine(Path.GetTempPath(), $"trust-test-{Guid.NewGuid():N}.json"); + } + + [TestCleanup] + public void Cleanup() + { + if (File.Exists(_tempFile)) + File.Delete(_tempFile); + } + + [TestMethod] + public async Task GetOrCreate_NewAgent_ReturnsObserveLevel() + { + var store = new FileAgentTrustStore(_tempFile); + + var entry = await store.GetOrCreateAsync("Agent1", CancellationToken.None); + + Assert.AreEqual("Agent1", entry.AgentId); + Assert.AreEqual(AgentTrustLevel.Observe, entry.Level); + Assert.AreEqual(0, entry.InteractionCount); + } + + [TestMethod] + public async Task GetOrCreate_ExistingAgent_ReturnsSameEntry() + { + var store = new FileAgentTrustStore(_tempFile); + + var first = await store.GetOrCreateAsync("Agent1", CancellationToken.None); + var second = await store.GetOrCreateAsync("Agent1", CancellationToken.None); + + Assert.AreEqual(first.AgentId, second.AgentId); + Assert.AreEqual(first.FirstSeen, second.FirstSeen); + } + + [TestMethod] + public async Task Update_PersistsChanges() + { + var store = new FileAgentTrustStore(_tempFile); + + var entry = await store.GetOrCreateAsync("Agent1", CancellationToken.None); + var updated = entry with + { + Level = AgentTrustLevel.Act, + InteractionCount = 5, + ApprovedSkills = ["notify-user"] + }; + await store.UpdateAsync(updated, CancellationToken.None); + + // Reload from file in a new store instance + var store2 = new FileAgentTrustStore(_tempFile); + var loaded = await store2.GetOrCreateAsync("Agent1", CancellationToken.None); + + Assert.AreEqual(AgentTrustLevel.Act, loaded.Level); + Assert.AreEqual(5, loaded.InteractionCount); + Assert.AreEqual(1, loaded.ApprovedSkills.Count); + Assert.AreEqual("notify-user", loaded.ApprovedSkills[0]); + } + + [TestMethod] + public async Task List_ReturnsAllEntries() + { + var store = new FileAgentTrustStore(_tempFile); + + await store.GetOrCreateAsync("Agent1", CancellationToken.None); + await store.GetOrCreateAsync("Agent2", CancellationToken.None); + + var entries = await store.ListAsync(CancellationToken.None); + + Assert.AreEqual(2, entries.Count); + } + + [TestMethod] + public async Task NullPath_WorksInMemoryOnly() + { + var store = new FileAgentTrustStore(null); + + var entry = await store.GetOrCreateAsync("Agent1", CancellationToken.None); + Assert.AreEqual("Agent1", entry.AgentId); + + var list = await store.ListAsync(CancellationToken.None); + Assert.AreEqual(1, list.Count); + } + + [TestMethod] + public async Task CaseInsensitive_LookupByAgentId() + { + var store = new FileAgentTrustStore(_tempFile); + + await store.GetOrCreateAsync("TestAgent", CancellationToken.None); + var entry = await store.GetOrCreateAsync("testagent", CancellationToken.None); + + Assert.AreEqual("TestAgent", entry.AgentId); + } +} diff --git a/tests/RockBot.Host.Tests/AgentProfileExtensionsTests.cs b/tests/RockBot.Host.Tests/AgentProfileExtensionsTests.cs index 66d416a..7e4df30 100644 --- a/tests/RockBot.Host.Tests/AgentProfileExtensionsTests.cs +++ b/tests/RockBot.Host.Tests/AgentProfileExtensionsTests.cs @@ -49,6 +49,7 @@ public void WithProfile_RegistersHostedService() var services = new ServiceCollection(); services.AddLogging(); services.AddSingleton(new StubSubscriber()); + services.AddSingleton(new StubPublisher()); services.AddRockBotHost(agent => { agent.WithIdentity("test-agent"); @@ -80,6 +81,13 @@ public void WithProfile_CustomOptions_ConfiguresBasePath() Assert.AreEqual("custom-path", options.Value.BasePath); } + private sealed class StubPublisher : IMessagePublisher + { + public Task PublishAsync(string topic, MessageEnvelope envelope, CancellationToken ct = default) + => Task.CompletedTask; + public ValueTask DisposeAsync() => ValueTask.CompletedTask; + } + private sealed class StubSubscriber : IMessageSubscriber { public Task SubscribeAsync(string topic, string subscriptionName, diff --git a/tests/RockBot.Host.Tests/InboundNotificationQueueTests.cs b/tests/RockBot.Host.Tests/InboundNotificationQueueTests.cs new file mode 100644 index 0000000..612606c --- /dev/null +++ b/tests/RockBot.Host.Tests/InboundNotificationQueueTests.cs @@ -0,0 +1,65 @@ +using RockBot.Host; + +namespace RockBot.Host.Tests; + +[TestClass] +public class InboundNotificationQueueTests +{ + [TestMethod] + public async Task Enqueue_IncrementsPendingCount() + { + var queue = new InboundNotificationQueue(); + Assert.AreEqual(0, queue.PendingCount); + + await queue.EnqueueAsync(CreateNotification("t1", "Agent1"), CancellationToken.None); + + Assert.AreEqual(1, queue.PendingCount); + } + + [TestMethod] + public async Task Drain_ReturnsAllAndClears() + { + var queue = new InboundNotificationQueue(); + await queue.EnqueueAsync(CreateNotification("t1", "Agent1"), CancellationToken.None); + await queue.EnqueueAsync(CreateNotification("t2", "Agent2"), CancellationToken.None); + + var items = await queue.DrainAsync(CancellationToken.None); + + Assert.AreEqual(2, items.Count); + Assert.AreEqual(0, queue.PendingCount); + } + + [TestMethod] + public async Task Drain_WhenEmpty_ReturnsEmptyList() + { + var queue = new InboundNotificationQueue(); + + var items = await queue.DrainAsync(CancellationToken.None); + + Assert.AreEqual(0, items.Count); + } + + [TestMethod] + public async Task Drain_PreservesOrder() + { + var queue = new InboundNotificationQueue(); + await queue.EnqueueAsync(CreateNotification("t1", "First"), CancellationToken.None); + await queue.EnqueueAsync(CreateNotification("t2", "Second"), CancellationToken.None); + await queue.EnqueueAsync(CreateNotification("t3", "Third"), CancellationToken.None); + + var items = await queue.DrainAsync(CancellationToken.None); + + Assert.AreEqual("First", items[0].CallerName); + Assert.AreEqual("Second", items[1].CallerName); + Assert.AreEqual("Third", items[2].CallerName); + } + + private static InboundNotification CreateNotification(string taskId, string callerName) => + new() + { + TaskId = taskId, + CallerName = callerName, + Summary = $"Test notification from {callerName}", + ReceivedAt = DateTimeOffset.UtcNow + }; +} diff --git a/tests/RockBot.Host.Tests/ServiceCollectionExtensionsTests.cs b/tests/RockBot.Host.Tests/ServiceCollectionExtensionsTests.cs index 04ce3b9..d232d91 100644 --- a/tests/RockBot.Host.Tests/ServiceCollectionExtensionsTests.cs +++ b/tests/RockBot.Host.Tests/ServiceCollectionExtensionsTests.cs @@ -27,6 +27,7 @@ public void AddRockBotHost_RegistersHostedService() var services = new ServiceCollection(); services.AddLogging(); services.AddSingleton(new StubSubscriber()); + services.AddSingleton(new StubPublisher()); services.AddRockBotHost(agent => agent.WithIdentity("test")); var provider = services.BuildServiceProvider(); @@ -62,6 +63,13 @@ public void AddRockBotHost_RegistersTypeResolver() Assert.IsNotNull(resolver); } + private sealed class StubPublisher : IMessagePublisher + { + public Task PublishAsync(string topic, MessageEnvelope envelope, CancellationToken ct = default) + => Task.CompletedTask; + public ValueTask DisposeAsync() => ValueTask.CompletedTask; + } + private sealed class StubSubscriber : IMessageSubscriber { public Task SubscribeAsync(string topic, string subscriptionName,