From a1745e7731a92cb503534f8138ab811181ae332e Mon Sep 17 00:00:00 2001 From: Benjamin SPETH Date: Mon, 1 Sep 2025 11:00:21 +0200 Subject: [PATCH 1/3] v5.5.0 --- docs/CHANGELOG.md | 6 + .../Exceptions/SenderNotFoundException.cs | 10 +- .../IMessagePublisher.cs | 7 +- .../IsExternalInit.cs | 11 + src/Ev.ServiceBus/Dispatch/DispatchSender.cs | 217 ++---------------- .../Dispatch/MessageDispatcher.cs | 12 +- .../Dispatch/Outbox/IOutboxRepository.cs | 12 + .../Dispatch/Outbox/OutboxDispatchSender.cs | 95 ++++++++ .../Outbox/OutboxMessageDispatcher.cs | 13 ++ .../Dispatch/Outbox/OutboxMessagePublisher.cs | 75 ++++++ .../Dispatch/ServiceBusMessageFactory.cs | 106 +++++++++ .../Dispatch/ServiceBusMessageSender.cs | 100 ++++++++ src/Ev.ServiceBus/Ev.ServiceBus.csproj | 1 + .../Isolation/IsolationService.cs | 2 +- .../Factories/MessageSenderFactory.cs | 10 +- .../Management/ServiceBusRegistry.cs | 20 +- src/Ev.ServiceBus/ServiceBusBuilder.cs | 11 + .../ServiceCollectionExtensions.cs | 2 + .../Core/DeactivatedSenderTest.cs | 2 +- 19 files changed, 483 insertions(+), 229 deletions(-) create mode 100644 src/Ev.ServiceBus.Abstractions/IsExternalInit.cs create mode 100644 src/Ev.ServiceBus/Dispatch/Outbox/IOutboxRepository.cs create mode 100644 src/Ev.ServiceBus/Dispatch/Outbox/OutboxDispatchSender.cs create mode 100644 src/Ev.ServiceBus/Dispatch/Outbox/OutboxMessageDispatcher.cs create mode 100644 src/Ev.ServiceBus/Dispatch/Outbox/OutboxMessagePublisher.cs create mode 100644 src/Ev.ServiceBus/Dispatch/ServiceBusMessageFactory.cs create mode 100644 src/Ev.ServiceBus/Dispatch/ServiceBusMessageSender.cs diff --git a/docs/CHANGELOG.md b/docs/CHANGELOG.md index 933eb6e..be0d483 100644 --- a/docs/CHANGELOG.md +++ b/docs/CHANGELOG.md @@ -4,6 +4,12 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). +## 5.5.0 +- Added Outbox pattern support + +## 5.4.0 +- Added Isolation feature V2 + ## 5.3.1 - Changed - Fix usage of sent counter on receiver instead of received counter diff --git a/src/Ev.ServiceBus.Abstractions/Exceptions/SenderNotFoundException.cs b/src/Ev.ServiceBus.Abstractions/Exceptions/SenderNotFoundException.cs index 6458e77..5419593 100644 --- a/src/Ev.ServiceBus.Abstractions/Exceptions/SenderNotFoundException.cs +++ b/src/Ev.ServiceBus.Abstractions/Exceptions/SenderNotFoundException.cs @@ -5,13 +5,13 @@ namespace Ev.ServiceBus.Abstractions; [Serializable] public class SenderNotFoundException : Exception { - public SenderNotFoundException(ClientType clientType, string topicName) + public SenderNotFoundException(string resourceId) : base( - $"The {clientType.ToString()} '{topicName}' you tried to retrieve was not found. " - + $"Verify your configuration to make sure the {clientType.ToString()} is properly registered.") + $"The '{resourceId}' you tried to retrieve was not found. " + + $"Verify your configuration to make sure the resource is properly registered.") { - TopicName = topicName; + ResourceId = resourceId; } - public string TopicName { get; } + public string ResourceId { get; } } \ No newline at end of file diff --git a/src/Ev.ServiceBus.Abstractions/IMessagePublisher.cs b/src/Ev.ServiceBus.Abstractions/IMessagePublisher.cs index 37f7aac..6ec1f3c 100644 --- a/src/Ev.ServiceBus.Abstractions/IMessagePublisher.cs +++ b/src/Ev.ServiceBus.Abstractions/IMessagePublisher.cs @@ -1,4 +1,5 @@ using System; +using System.Threading.Tasks; namespace Ev.ServiceBus.Abstractions; @@ -9,7 +10,7 @@ public interface IMessagePublisher /// /// The object to send through Service Bus /// A type of object that is registered within Ev.ServiceBus - void Publish(TMessagePayload messageDto); + Task Publish(TMessagePayload messageDto); /// /// Temporarily stores the object to send through Service Bus until is called. @@ -17,7 +18,7 @@ public interface IMessagePublisher /// The object to send through Service Bus /// The sessionId to attach to the outgoing message /// A type of object that is registered within Ev.ServiceBus - void Publish(TMessagePayload messageDto, string sessionId); + Task Publish(TMessagePayload messageDto, string sessionId); /// /// Temporarily stores the object to send through Service Bus until is called. @@ -25,5 +26,5 @@ public interface IMessagePublisher /// The object to send through Service Bus /// Configurator of message context /// A type of object that is registered within Ev.ServiceBus - void Publish(TMessagePayload messageDto, Action messageContextConfiguration); + Task Publish(TMessagePayload messageDto, Action messageContextConfiguration); } \ No newline at end of file diff --git a/src/Ev.ServiceBus.Abstractions/IsExternalInit.cs b/src/Ev.ServiceBus.Abstractions/IsExternalInit.cs new file mode 100644 index 0000000..37f9b76 --- /dev/null +++ b/src/Ev.ServiceBus.Abstractions/IsExternalInit.cs @@ -0,0 +1,11 @@ +using System.ComponentModel; + +namespace System.Runtime.CompilerServices; + +/// +/// Reserved to be used by the compiler for tracking metadata. +/// This class should not be used by developers in source code. +/// +[EditorBrowsable(EditorBrowsableState.Never)] +internal static class IsExternalInit { +} diff --git a/src/Ev.ServiceBus/Dispatch/DispatchSender.cs b/src/Ev.ServiceBus/Dispatch/DispatchSender.cs index 480402a..1a12494 100644 --- a/src/Ev.ServiceBus/Dispatch/DispatchSender.cs +++ b/src/Ev.ServiceBus/Dispatch/DispatchSender.cs @@ -3,40 +3,21 @@ using System.Linq; using System.Threading; using System.Threading.Tasks; -using Azure.Messaging.ServiceBus; using Ev.ServiceBus.Abstractions; -using Ev.ServiceBus.Abstractions.Extensions; -using Ev.ServiceBus.Abstractions.MessageReception; -using Ev.ServiceBus.Diagnostics; -using Ev.ServiceBus.Management; -using Microsoft.Extensions.Options; namespace Ev.ServiceBus.Dispatch; public class DispatchSender : IDispatchSender { - private const int MaxMessagePerSend = 100; - private readonly IMessagePayloadSerializer _messagePayloadSerializer; - private readonly ServiceBusRegistry _dispatchRegistry; - private readonly ServiceBusRegistry _registry; - private readonly IMessageMetadataAccessor _messageMetadataAccessor; - private readonly IEnumerable _dispatchCustomizers; - private readonly ServiceBusOptions _serviceBusOptions; + private readonly ServiceBusMessageFactory _messageFactory; + private readonly ServiceBusMessageSender _serviceBusMessageSender; public DispatchSender( - ServiceBusRegistry registry, - IMessagePayloadSerializer messagePayloadSerializer, - ServiceBusRegistry dispatchRegistry, - IMessageMetadataAccessor messageMetadataAccessor, - IEnumerable dispatchCustomizers, - IOptions serviceBusOptions) + ServiceBusMessageFactory messageFactory, + ServiceBusMessageSender serviceBusMessageSender) { - _registry = registry; - _messagePayloadSerializer = messagePayloadSerializer; - _dispatchRegistry = dispatchRegistry; - _messageMetadataAccessor = messageMetadataAccessor; - _dispatchCustomizers = dispatchCustomizers; - _serviceBusOptions = serviceBusOptions.Value; + _messageFactory = messageFactory; + _serviceBusMessageSender = serviceBusMessageSender; } /// @@ -50,20 +31,14 @@ public async Task SendDispatch(object messagePayload, CancellationToken token = /// public async Task SendDispatch(Abstractions.Dispatch messagePayload, CancellationToken token = default) { - var dispatches = CreateMessagesToSend([messagePayload]); + var dispatches = _messageFactory.CreateMessagesToSend([messagePayload]); - foreach (var messagePerResource in dispatches) - { - var message = messagePerResource.Messages.Single(); + var messagePerResource = dispatches.Single(); - await messagePerResource.Sender.SendMessageAsync(message.Message, token); - ServiceBusMeter.IncrementSentCounter( - 1, - messagePerResource.Sender.ClientType.ToString(), - messagePerResource.Sender.Name, - message.Message.ApplicationProperties[UserProperties.PayloadTypeIdProperty]?.ToString() - ); - } + await _serviceBusMessageSender.SendMessages( + messagePerResource.ResourceId, + messagePerResource.Messages, + token); } /// @@ -86,48 +61,10 @@ public async Task SendDispatches(IEnumerable messagePaylo throw new ArgumentNullException(nameof(messagePayloads)); } - var dispatches = CreateMessagesToSend(messagePayloads); + var dispatches = _messageFactory.CreateMessagesToSend(messagePayloads); foreach (var messagesPerResource in dispatches) { - await BatchAndSendMessages(messagesPerResource, token, async (sender, batch) => - { - await sender.SendMessagesAsync(batch, token); - }); - } - } - - private async Task BatchAndSendMessages(MessagesPerResource dispatches, CancellationToken token, Func senderAction) - { - var batches = new List(); - var batch = await dispatches.Sender.CreateMessageBatchAsync(token); - batches.Add(batch); - foreach (var messageToSend in dispatches.Messages) - { - ServiceBusMeter.IncrementSentCounter( - 1, - dispatches.Sender.ClientType.ToString(), - dispatches.Sender.Name, - messageToSend.Message.ApplicationProperties[UserProperties.PayloadTypeIdProperty]?.ToString() - ); - - if (batch.TryAddMessage(messageToSend.Message)) - { - continue; - } - batch = await dispatches.Sender.CreateMessageBatchAsync(token); - batches.Add(batch); - if (batch.TryAddMessage(messageToSend.Message)) - { - continue; - } - - throw new ArgumentOutOfRangeException("A message is too big to fit in a single batch"); - } - - foreach (var pageMessages in batches) - { - await senderAction.Invoke(dispatches.Sender, pageMessages); - pageMessages.Dispose(); + await _serviceBusMessageSender.SendMessages(messagesPerResource.ResourceId, messagesPerResource.Messages, token); } } @@ -151,132 +88,10 @@ public async Task ScheduleDispatches(IEnumerable messageP throw new ArgumentNullException(nameof(messagePayloads)); } - var dispatches = CreateMessagesToSend(messagePayloads); + var dispatches = _messageFactory.CreateMessagesToSend(messagePayloads); foreach (var messagesPerResource in dispatches) { - await PaginateAndSendMessages(messagesPerResource, async (sender, page) => - { - await sender.ScheduleMessagesAsync(page, scheduledEnqueueTime, token); - }); - } - } - - private async Task PaginateAndSendMessages(MessagesPerResource dispatches, Func, Task> senderAction) - { - var paginatedMessages = dispatches.Messages.Select(o => o.Message) - .Select((x, i) => new - { - Item = x, - Index = i - }) - .GroupBy(x => x.Index / MaxMessagePerSend, x => x.Item); - - foreach (var pageMessages in paginatedMessages) - { - foreach (var message in pageMessages) - { - ServiceBusMeter.IncrementSentCounter( - 1, - dispatches.Sender.ClientType.ToString(), - dispatches.Sender.Name, - message.ApplicationProperties[UserProperties.PayloadTypeIdProperty]?.ToString() - ); - } - - await senderAction.Invoke(dispatches.Sender, pageMessages.Select(m => m).ToArray()); - } - } - - private class MessagesPerResource - { - public MessageToSend[] Messages { get; set; } - public ClientType ClientType { get; set; } - public string ResourceId { get; set; } - public IMessageSender Sender { get; set; } - } - - private class MessageToSend - { - public MessageToSend(ServiceBusMessage message, MessageDispatchRegistration registration) - { - Message = message; - Registration = registration; - } - - public ServiceBusMessage Message { get; } - public MessageDispatchRegistration Registration { get; } - } - - private MessagesPerResource[] CreateMessagesToSend(IEnumerable messagePayloads) - { - var dispatches = - ( - from dispatch in messagePayloads - // the same dispatch can be published to several senders - let registrations = _dispatchRegistry.GetDispatchRegistrations(dispatch.Payload.GetType()) - from eventPublicationRegistration in registrations - let message = CreateMessage(eventPublicationRegistration, dispatch) - select new MessageToSend(message, eventPublicationRegistration) - ) - .ToArray(); - - var messagesPerResource = ( - from dispatch in dispatches - group dispatch by new { dispatch.Registration.Options.ClientType, dispatch.Registration.Options.ResourceId } into gr - let sender = _registry.GetMessageSender(gr.Key.ClientType, gr.Key.ResourceId) - select new MessagesPerResource() - { - Messages = gr.ToArray(), - ClientType = gr.Key.ClientType, - ResourceId = gr.Key.ResourceId, - Sender = sender - }).ToArray(); - - return messagesPerResource; - } - - private ServiceBusMessage CreateMessage( - MessageDispatchRegistration registration, - Abstractions.Dispatch dispatch) - { - var result = _messagePayloadSerializer.SerializeBody(dispatch.Payload); - var message = MessageHelper.CreateMessage(result.ContentType, result.Body, registration.PayloadTypeId); - - dispatch.ApplicationProperties.Remove(UserProperties.PayloadTypeIdProperty); - foreach (var dispatchApplicationProperty in dispatch.ApplicationProperties) - { - message.ApplicationProperties[dispatchApplicationProperty.Key] = dispatchApplicationProperty.Value; - } - - message.SessionId = dispatch.SessionId; - - var originalCorrelationId = _messageMetadataAccessor.Metadata?.CorrelationId ?? Guid.NewGuid().ToString(); - message.CorrelationId = dispatch.CorrelationId ?? originalCorrelationId; - - var originalIsolationKey = _messageMetadataAccessor.Metadata?.ApplicationProperties.GetIsolationKey(); - message.SetIsolationKey(originalIsolationKey ?? _serviceBusOptions.Settings.IsolationSettings.IsolationKey); - - var originalIsolationApps = _messageMetadataAccessor.Metadata?.ApplicationProperties.GetIsolationApps() ?? []; - message.SetIsolationApps(originalIsolationApps); - - if (dispatch.DiagnosticId != null) - { - message.SetDiagnosticIdIfIsNot(dispatch.DiagnosticId); - } - if (!string.IsNullOrWhiteSpace(dispatch.MessageId)) - { - message.MessageId = dispatch.MessageId; - } - - foreach (var customizer in registration.OutgoingMessageCustomizers) - { - customizer?.Invoke(message, dispatch.Payload); - } - - foreach (var dispatchCustomizer in _dispatchCustomizers) - { - dispatchCustomizer.ExtendDispatch(message, dispatch.Payload); + await _serviceBusMessageSender.ScheduleMessages(messagesPerResource, scheduledEnqueueTime, token); } - return message; } } \ No newline at end of file diff --git a/src/Ev.ServiceBus/Dispatch/MessageDispatcher.cs b/src/Ev.ServiceBus/Dispatch/MessageDispatcher.cs index 89552a3..0bb5796 100644 --- a/src/Ev.ServiceBus/Dispatch/MessageDispatcher.cs +++ b/src/Ev.ServiceBus/Dispatch/MessageDispatcher.cs @@ -31,7 +31,7 @@ public async Task ExecuteDispatches(CancellationToken token) } /// - public void Publish(TMessageDto messageDto) + public Task Publish(TMessageDto messageDto) { if (messageDto == null) { @@ -42,10 +42,12 @@ public void Publish(TMessageDto messageDto) { DiagnosticId = Activity.Current?.Id }); + + return Task.CompletedTask; } /// - public void Publish(TMessagePayload messageDto, string sessionId) + public Task Publish(TMessagePayload messageDto, string sessionId) { if (messageDto == null) { @@ -62,10 +64,12 @@ public void Publish(TMessagePayload messageDto, string sessionI SessionId = sessionId, DiagnosticId = Activity.Current?.Id }); + + return Task.CompletedTask; } /// - public void Publish( + public Task Publish( TMessagePayload messageDto, Action messageContextConfiguration) { @@ -90,5 +94,7 @@ public void Publish( MessageId = context.MessageId, DiagnosticId = context.DiagnosticId ?? Activity.Current?.Id }); + + return Task.CompletedTask; } } \ No newline at end of file diff --git a/src/Ev.ServiceBus/Dispatch/Outbox/IOutboxRepository.cs b/src/Ev.ServiceBus/Dispatch/Outbox/IOutboxRepository.cs new file mode 100644 index 0000000..646574f --- /dev/null +++ b/src/Ev.ServiceBus/Dispatch/Outbox/IOutboxRepository.cs @@ -0,0 +1,12 @@ +using System; +using System.Threading; +using System.Threading.Tasks; +using Azure.Messaging.ServiceBus; + +namespace Ev.ServiceBus.Dispatch.Outbox; + +public interface IOutboxRepository +{ + Task Add(string resourceId, ServiceBusMessage message, CancellationToken token); + Task AddScheduled(string resourceId, DateTimeOffset scheduledEnqueueTime, ServiceBusMessage message, CancellationToken token); +} diff --git a/src/Ev.ServiceBus/Dispatch/Outbox/OutboxDispatchSender.cs b/src/Ev.ServiceBus/Dispatch/Outbox/OutboxDispatchSender.cs new file mode 100644 index 0000000..391fd32 --- /dev/null +++ b/src/Ev.ServiceBus/Dispatch/Outbox/OutboxDispatchSender.cs @@ -0,0 +1,95 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Threading; +using System.Threading.Tasks; +using Ev.ServiceBus.Abstractions; + +namespace Ev.ServiceBus.Dispatch.Outbox; + +public class OutboxDispatchSender : IDispatchSender +{ + private readonly IDispatchSender _underlyingDispatchSender; + private readonly IOutboxRepository _outboxRepository; + private readonly ServiceBusMessageFactory _messageFactory; + + public OutboxDispatchSender( + IDispatchSender underlyingDispatchSender, + IOutboxRepository outboxRepository, + ServiceBusMessageFactory serviceBusMessageFactory) + { + _underlyingDispatchSender = underlyingDispatchSender; + _outboxRepository = outboxRepository; + _messageFactory = serviceBusMessageFactory; + } + + public async Task SendDispatch(object messagePayload, CancellationToken token = default) + { + await SendDispatch(new Abstractions.Dispatch(messagePayload), token); + } + + public async Task SendDispatch(Abstractions.Dispatch messagePayload, CancellationToken token = default) + { + var messagesPerResources = _messageFactory.CreateMessagesToSend([messagePayload]); + + foreach (var messagesPerResource in messagesPerResources) + { + foreach (var message in messagesPerResource.Messages) + { + await _outboxRepository.Add(messagesPerResource.ResourceId, message, token); + } + } + } + + public async Task SendDispatches(IEnumerable messagePayloads, CancellationToken token = default) + { + if (messagePayloads == null) + { + throw new ArgumentNullException(nameof(messagePayloads)); + } + + var dispatches = messagePayloads.Select(o => new Abstractions.Dispatch(o)).ToArray(); + await SendDispatches(dispatches, token); + } + + public async Task SendDispatches(IEnumerable messagePayloads, CancellationToken token = default) + { + var messagesPerResources = _messageFactory.CreateMessagesToSend(messagePayloads); + + foreach (var messagesPerResource in messagesPerResources) + { + foreach (var message in messagesPerResource.Messages) + { + await _outboxRepository.Add(messagesPerResource.ResourceId, message, token); + } + } + } + + public async Task ScheduleDispatches(IEnumerable messagePayloads, DateTimeOffset scheduledEnqueueTime, + CancellationToken token = default) + { + if (messagePayloads == null) + { + throw new ArgumentNullException(nameof(messagePayloads)); + } + + var dispatches = messagePayloads.Select(o => new Abstractions.Dispatch(o)).ToArray(); + await ScheduleDispatches(dispatches, scheduledEnqueueTime, token); + } + + public async Task ScheduleDispatches( + IEnumerable messagePayloads, + DateTimeOffset scheduledEnqueueTime, + CancellationToken token = default) + { + var messagesPerResources = _messageFactory.CreateMessagesToSend(messagePayloads); + + foreach (var messagesPerResource in messagesPerResources) + { + foreach (var message in messagesPerResource.Messages) + { + await _outboxRepository.AddScheduled(messagesPerResource.ResourceId, scheduledEnqueueTime, message, token); + } + } + } +} \ No newline at end of file diff --git a/src/Ev.ServiceBus/Dispatch/Outbox/OutboxMessageDispatcher.cs b/src/Ev.ServiceBus/Dispatch/Outbox/OutboxMessageDispatcher.cs new file mode 100644 index 0000000..2f82203 --- /dev/null +++ b/src/Ev.ServiceBus/Dispatch/Outbox/OutboxMessageDispatcher.cs @@ -0,0 +1,13 @@ +using System.Threading; +using System.Threading.Tasks; +using Ev.ServiceBus.Abstractions; + +namespace Ev.ServiceBus.Dispatch.Outbox; + +public class OutboxMessageDispatcher : IMessageDispatcher +{ + public Task ExecuteDispatches(CancellationToken token) + { + return Task.CompletedTask; + } +} diff --git a/src/Ev.ServiceBus/Dispatch/Outbox/OutboxMessagePublisher.cs b/src/Ev.ServiceBus/Dispatch/Outbox/OutboxMessagePublisher.cs new file mode 100644 index 0000000..8732160 --- /dev/null +++ b/src/Ev.ServiceBus/Dispatch/Outbox/OutboxMessagePublisher.cs @@ -0,0 +1,75 @@ +using System; +using System.Diagnostics; +using System.Threading.Tasks; +using Ev.ServiceBus.Abstractions; + +namespace Ev.ServiceBus.Dispatch.Outbox; + +public class OutboxMessagePublisher : IMessagePublisher +{ + private readonly IMessagePublisher _underlyingMessagePublisher; + private readonly IDispatchSender _dispatchSender; + + public OutboxMessagePublisher( + IMessagePublisher underlyingMessagePublisher, + IDispatchSender dispatchSender) + { + _underlyingMessagePublisher = underlyingMessagePublisher; + _dispatchSender = dispatchSender; + } + + public async Task Publish(TMessagePayload messageDto) + { + if (messageDto == null) + { + throw new ArgumentNullException(nameof(messageDto)); + } + + await _dispatchSender.SendDispatch(messageDto!); + } + + public async Task Publish(TMessagePayload messageDto, string sessionId) + { + if (messageDto == null) + { + throw new ArgumentNullException(nameof(messageDto)); + } + + if (sessionId == null) + { + throw new ArgumentNullException(nameof(sessionId)); + } + + await _dispatchSender.SendDispatch(new Abstractions.Dispatch(messageDto) + { + SessionId = sessionId, + DiagnosticId = Activity.Current?.Id + }); + } + + public async Task Publish(TMessagePayload messageDto, Action messageContextConfiguration) + { + if (messageDto == null) + { + throw new ArgumentNullException(nameof(messageDto)); + } + + if (messageContextConfiguration == null) + { + throw new ArgumentNullException(nameof(messageContextConfiguration)); + } + + var context = new DispatchContext(); + + messageContextConfiguration.Invoke(context); + + var dispatch = new Abstractions.Dispatch(messageDto, context) + { + SessionId = context.SessionId, + CorrelationId = context.CorrelationId, + MessageId = context.MessageId, + DiagnosticId = context.DiagnosticId ?? Activity.Current?.Id + }; + await _dispatchSender.SendDispatch(dispatch); + } +} diff --git a/src/Ev.ServiceBus/Dispatch/ServiceBusMessageFactory.cs b/src/Ev.ServiceBus/Dispatch/ServiceBusMessageFactory.cs new file mode 100644 index 0000000..21f77c5 --- /dev/null +++ b/src/Ev.ServiceBus/Dispatch/ServiceBusMessageFactory.cs @@ -0,0 +1,106 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using Azure.Messaging.ServiceBus; +using Ev.ServiceBus.Abstractions; +using Ev.ServiceBus.Abstractions.Extensions; +using Ev.ServiceBus.Abstractions.MessageReception; +using Ev.ServiceBus.Management; +using Microsoft.Extensions.Options; + +namespace Ev.ServiceBus.Dispatch; + +public record MessagesPerResource(ServiceBusMessage[] Messages, string ResourceId); + +public class ServiceBusMessageFactory +{ + private readonly ServiceBusRegistry _registry; + private readonly IMessagePayloadSerializer _messagePayloadSerializer; + private readonly IMessageMetadataAccessor _messageMetadataAccessor; + private readonly IEnumerable _dispatchCustomizers; + private readonly ServiceBusOptions _serviceBusOptions; + + public ServiceBusMessageFactory( + ServiceBusRegistry serviceBusRegistry, + IMessagePayloadSerializer messagePayloadSerializer, + IMessageMetadataAccessor messageMetadataAccessor, + IEnumerable dispatchCustomizers, + IOptions serviceBusOptions + ) + { + _registry = serviceBusRegistry; + _messagePayloadSerializer = messagePayloadSerializer; + _messageMetadataAccessor = messageMetadataAccessor; + _dispatchCustomizers = dispatchCustomizers; + _serviceBusOptions = serviceBusOptions.Value; + } + + private record MessageToSend(ServiceBusMessage Message, string ResourceId); + + public MessagesPerResource[] CreateMessagesToSend(IEnumerable messagePayloads) + { + var dispatches = + ( + from dispatch in messagePayloads + // the same dispatch can be published to several senders + let registrations = _registry.GetDispatchRegistrations(dispatch.Payload.GetType()) + from eventPublicationRegistration in registrations + let message = CreateMessage(eventPublicationRegistration, dispatch) + select new MessageToSend(message, eventPublicationRegistration.Options.ResourceId) + ) + .ToArray(); + + var messagesPerResource = ( + from dispatch in dispatches + group dispatch by dispatch.ResourceId into gr + select new MessagesPerResource(gr.Select(o => o.Message).ToArray(), gr.Key)) + .ToArray(); + + return messagesPerResource; + } + + private ServiceBusMessage CreateMessage( + MessageDispatchRegistration registration, + Abstractions.Dispatch dispatch) + { + var result = _messagePayloadSerializer.SerializeBody(dispatch.Payload); + var message = MessageHelper.CreateMessage(result.ContentType, result.Body, registration.PayloadTypeId); + + dispatch.ApplicationProperties.Remove(UserProperties.PayloadTypeIdProperty); + foreach (var dispatchApplicationProperty in dispatch.ApplicationProperties) + { + message.ApplicationProperties[dispatchApplicationProperty.Key] = dispatchApplicationProperty.Value; + } + + message.SessionId = dispatch.SessionId; + + var originalCorrelationId = _messageMetadataAccessor.Metadata?.CorrelationId ?? Guid.NewGuid().ToString(); + message.CorrelationId = dispatch.CorrelationId ?? originalCorrelationId; + + var originalIsolationKey = _messageMetadataAccessor.Metadata?.ApplicationProperties.GetIsolationKey(); + message.SetIsolationKey(originalIsolationKey ?? _serviceBusOptions.Settings.IsolationSettings.IsolationKey); + + var originalIsolationApps = _messageMetadataAccessor.Metadata?.ApplicationProperties.GetIsolationApps() ?? []; + message.SetIsolationApps(originalIsolationApps); + + if (dispatch.DiagnosticId != null) + { + message.SetDiagnosticIdIfIsNot(dispatch.DiagnosticId); + } + if (!string.IsNullOrWhiteSpace(dispatch.MessageId)) + { + message.MessageId = dispatch.MessageId; + } + + foreach (var customizer in registration.OutgoingMessageCustomizers) + { + customizer?.Invoke(message, dispatch.Payload); + } + + foreach (var dispatchCustomizer in _dispatchCustomizers) + { + dispatchCustomizer.ExtendDispatch(message, dispatch.Payload); + } + return message; + } +} diff --git a/src/Ev.ServiceBus/Dispatch/ServiceBusMessageSender.cs b/src/Ev.ServiceBus/Dispatch/ServiceBusMessageSender.cs new file mode 100644 index 0000000..8a017b3 --- /dev/null +++ b/src/Ev.ServiceBus/Dispatch/ServiceBusMessageSender.cs @@ -0,0 +1,100 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Threading; +using System.Threading.Tasks; +using Azure.Messaging.ServiceBus; +using Ev.ServiceBus.Abstractions; +using Ev.ServiceBus.Diagnostics; +using Ev.ServiceBus.Management; + +namespace Ev.ServiceBus.Dispatch; + +public class ServiceBusMessageSender +{ + private const int MaxMessagePerSend = 100; + private readonly ServiceBusRegistry _registry; + + public ServiceBusMessageSender(ServiceBusRegistry registry) + { + _registry = registry; + } + + public async Task SendMessages(string resourceId, ServiceBusMessage[] messages, CancellationToken token) + { + var sender = _registry.GetMessageSender(resourceId); + var batches = await GetBatches(sender, messages, token); + + foreach (var batch in batches) + { + await sender.SendMessagesAsync(batch, token); + batch.Dispose(); + } + } + + private async Task GetBatches( + IMessageSender sender, + ServiceBusMessage[] messages, + CancellationToken token) + { + var batches = new List(); + var batch = await sender.CreateMessageBatchAsync(token); + batches.Add(batch); + foreach (var message in messages) + { + ServiceBusMeter.IncrementSentCounter( + 1, + sender.ClientType.ToString(), + sender.Name, + message.ApplicationProperties[UserProperties.PayloadTypeIdProperty]?.ToString() + ); + + if (batch.TryAddMessage(message)) + { + continue; + } + batch = await sender.CreateMessageBatchAsync(token); + batches.Add(batch); + if (batch.TryAddMessage(message)) + { + continue; + } + + throw new ArgumentOutOfRangeException("A message is too big to fit in a single batch"); + } + + return batches.ToArray(); + } + + public async Task ScheduleMessages( + MessagesPerResource messagesPerResource, + DateTimeOffset scheduledEnqueueTime, + CancellationToken token) + { + var sender = _registry.GetMessageSender(messagesPerResource.ResourceId); + + var pages = messagesPerResource.Messages + .Select((x, i) => new + { + Item = x, + Index = i + }) + .GroupBy(x => x.Index / MaxMessagePerSend, x => x.Item) + .Select(o => o.ToArray()) + .ToArray(); + + foreach (var page in pages) + { + foreach (var message in page) + { + ServiceBusMeter.IncrementSentCounter( + 1, + sender.ClientType.ToString(), + sender.Name, + message.ApplicationProperties[UserProperties.PayloadTypeIdProperty]?.ToString() + ); + } + await sender.ScheduleMessagesAsync(page, scheduledEnqueueTime, token); + } + } +} diff --git a/src/Ev.ServiceBus/Ev.ServiceBus.csproj b/src/Ev.ServiceBus/Ev.ServiceBus.csproj index eba7db4..0ca97c8 100644 --- a/src/Ev.ServiceBus/Ev.ServiceBus.csproj +++ b/src/Ev.ServiceBus/Ev.ServiceBus.csproj @@ -14,6 +14,7 @@ Its goal to is make it the easiest possible to connect and handle an Azure Servi + diff --git a/src/Ev.ServiceBus/Isolation/IsolationService.cs b/src/Ev.ServiceBus/Isolation/IsolationService.cs index cafce2f..d724daa 100644 --- a/src/Ev.ServiceBus/Isolation/IsolationService.cs +++ b/src/Ev.ServiceBus/Isolation/IsolationService.cs @@ -89,7 +89,7 @@ private async Task SendToSourceAsync( var senderInfo = GetSenderResourceId(messageContext); // Try to get existing sender - var sender = _registry.TryGetMessageSender(senderInfo.ClientType, senderInfo.ResourceId); + var sender = _registry.TryGetMessageSender(senderInfo.ResourceId); if (sender != null) { await sender.SendMessageAsync(message, messageContext.CancellationToken); diff --git a/src/Ev.ServiceBus/Management/Factories/MessageSenderFactory.cs b/src/Ev.ServiceBus/Management/Factories/MessageSenderFactory.cs index d38f200..df6164b 100644 --- a/src/Ev.ServiceBus/Management/Factories/MessageSenderFactory.cs +++ b/src/Ev.ServiceBus/Management/Factories/MessageSenderFactory.cs @@ -30,9 +30,9 @@ public MessageSenderFactory( public IMessageSender CreateSender(ClientOptions[] senderOptions) { var options = (IClientOptions)senderOptions.First(); - if (_registry.IsSenderResourceIdTaken(options.ClientType, options.ResourceId)) + if (_registry.IsSenderResourceIdTaken(options.ResourceId)) { - var resourceId = GetNewSenderResourceId(options.ClientType, options.ResourceId); + var resourceId = GetNewSenderResourceId(options.ResourceId); foreach (var sender in senderOptions) { sender.UpdateResourceId(resourceId); @@ -56,7 +56,7 @@ public IMessageSender CreateSender(ClientOptions[] senderOptions) var client = _registry.CreateOrGetServiceBusClient(connectionSettings); var senderClient = client!.CreateSender(options.ResourceId); - _registry.Register(options.ClientType, options.ResourceId, senderClient); + _registry.Register(options.ResourceId, senderClient); var messageSender = new MessageSender(senderClient, options.ResourceId, options.ClientType, _provider.GetRequiredService>()); @@ -70,11 +70,11 @@ public IMessageSender CreateSender(ClientOptions[] senderOptions) } } - private string GetNewSenderResourceId(ClientType clientType, string resourceId) + private string GetNewSenderResourceId(string resourceId) { var newResourceId = resourceId; var suffix = 2; - while (_registry.IsSenderResourceIdTaken(clientType, newResourceId)) + while (_registry.IsSenderResourceIdTaken(newResourceId)) { newResourceId = $"{resourceId}_{suffix}"; ++suffix; diff --git a/src/Ev.ServiceBus/Management/ServiceBusRegistry.cs b/src/Ev.ServiceBus/Management/ServiceBusRegistry.cs index ebfe19a..01e966a 100644 --- a/src/Ev.ServiceBus/Management/ServiceBusRegistry.cs +++ b/src/Ev.ServiceBus/Management/ServiceBusRegistry.cs @@ -50,19 +50,19 @@ public ServiceBusRegistry( return client; } - public IMessageSender? TryGetMessageSender(ClientType clientType, string resourceId) + public IMessageSender? TryGetMessageSender(string resourceId) { - return _messageSenders.GetValueOrDefault(ComputeResourceKey(clientType, resourceId)); + return _messageSenders.GetValueOrDefault(resourceId); } - public IMessageSender GetMessageSender(ClientType clientType, string resourceId) + public IMessageSender GetMessageSender(string resourceId) { - if (_messageSenders.TryGetValue(ComputeResourceKey(clientType, resourceId), out var sender)) + if (_messageSenders.TryGetValue(resourceId, out var sender)) { return sender; } - throw new SenderNotFoundException(clientType, resourceId); + throw new SenderNotFoundException(resourceId); } internal ServiceBusSender[] GetAllSenderClients() @@ -87,7 +87,7 @@ private string ComputeReceptionKey(string payloadTypeId, string receiverName, Cl internal void Register(IMessageSender sender) { - _messageSenders.Add(ComputeResourceKey(sender.ClientType, sender.Name), sender); + _messageSenders.Add(sender.Name, sender); } internal void Register(MessageReceptionRegistration reception) @@ -100,9 +100,9 @@ internal void Register(Type dispatchType, MessageDispatchRegistration[] dispatch _dispatches.Add(dispatchType, dispatches); } - public void Register(ClientType clientType, string resourceId, ServiceBusSender senderClient) + public void Register(string resourceId, ServiceBusSender senderClient) { - _senderClients.Add(ComputeResourceKey(clientType, resourceId), senderClient); + _senderClients.Add(resourceId, senderClient); } public void Register(ClientType clientType, string resourceId, ReceiverWrapper receiverWrapper) @@ -137,9 +137,9 @@ public MessageReceptionRegistration[] GetReceptionRegistrations() return _receptions.Values.ToArray(); } - internal bool IsSenderResourceIdTaken(ClientType clientType, string resourceId) + internal bool IsSenderResourceIdTaken(string resourceId) { - return _senderClients.ContainsKey(ComputeResourceKey(clientType, resourceId)); + return _senderClients.ContainsKey(resourceId); } internal bool IsReceiverResourceIdTaken(ClientType clientType, string resourceId) diff --git a/src/Ev.ServiceBus/ServiceBusBuilder.cs b/src/Ev.ServiceBus/ServiceBusBuilder.cs index cffdef2..099068c 100644 --- a/src/Ev.ServiceBus/ServiceBusBuilder.cs +++ b/src/Ev.ServiceBus/ServiceBusBuilder.cs @@ -1,5 +1,6 @@ using Ev.ServiceBus.Abstractions; using Ev.ServiceBus.Dispatch; +using Ev.ServiceBus.Dispatch.Outbox; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.DependencyInjection.Extensions; @@ -49,4 +50,14 @@ public ServiceBusBuilder WithPayloadSerializer() Services.Replace(new ServiceDescriptor(typeof(IMessagePayloadSerializer), typeof(TMessagePayloadSerializer), ServiceLifetime.Singleton)); return this; } + + public ServiceBusBuilder WithOutboxIntegration() + where TOutboxRepository : class, IOutboxRepository + { + Services.Decorate(); + Services.Decorate(); + Services.Decorate(); + Services.AddScoped(); + return this; + } } \ No newline at end of file diff --git a/src/Ev.ServiceBus/ServiceCollectionExtensions.cs b/src/Ev.ServiceBus/ServiceCollectionExtensions.cs index 1c30e39..f6aed2c 100644 --- a/src/Ev.ServiceBus/ServiceCollectionExtensions.cs +++ b/src/Ev.ServiceBus/ServiceCollectionExtensions.cs @@ -67,6 +67,8 @@ private static void RegisterMessageDispatchServices(IServiceCollection services) services.TryAddScoped(provider => provider.GetRequiredService()); services.TryAddScoped(provider => provider.GetRequiredService()); services.TryAddScoped(); + services.TryAddScoped(); + services.TryAddScoped(); } private static void RegisterResourceManagementServices(IServiceCollection services) diff --git a/tests/Ev.ServiceBus.UnitTests/Core/DeactivatedSenderTest.cs b/tests/Ev.ServiceBus.UnitTests/Core/DeactivatedSenderTest.cs index 4376447..1da5a83 100644 --- a/tests/Ev.ServiceBus.UnitTests/Core/DeactivatedSenderTest.cs +++ b/tests/Ev.ServiceBus.UnitTests/Core/DeactivatedSenderTest.cs @@ -34,7 +34,7 @@ private async Task ComposeServiceBusAndGetSender() var provider = await composer.Compose(); provider.GetSenderMock("testQueue").Should().BeNull(); - return provider.GetRequiredService().GetMessageSender(ClientType.Queue, "testQueue"); + return provider.GetRequiredService().GetMessageSender("testQueue"); } [Fact] From e1be6f4ecaaede45fff2a20abeac5c3ffca7825b Mon Sep 17 00:00:00 2001 From: Benjamin SPETH Date: Mon, 1 Sep 2025 11:07:53 +0200 Subject: [PATCH 2/3] update --- tests/Ev.ServiceBus.UnitTests/DispatchTest.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/Ev.ServiceBus.UnitTests/DispatchTest.cs b/tests/Ev.ServiceBus.UnitTests/DispatchTest.cs index ef65051..47628fc 100644 --- a/tests/Ev.ServiceBus.UnitTests/DispatchTest.cs +++ b/tests/Ev.ServiceBus.UnitTests/DispatchTest.cs @@ -437,7 +437,7 @@ public async Task SendDispatch() // Verify var factory = provider.GetRequiredService(); var mock = factory.GetSenderMock("myQueue"); - mock.Mock.Verify(o => o.SendMessageAsync(It.IsAny(), It.IsAny()), Times.Exactly(1)); + mock.Mock.Verify(o => o.SendMessagesAsync(It.IsAny(), It.IsAny()), Times.Exactly(1)); // Dispose await provider.SimulateStopHost(CancellationToken.None); From b98cbfc8d1bd39318040034e7d9d0466210afba3 Mon Sep 17 00:00:00 2001 From: Benjamin SPETH Date: Mon, 3 Nov 2025 14:10:46 +0100 Subject: [PATCH 3/3] update --- docs/CHANGELOG.md | 4 +- .../Ev.ServiceBus.Abstractions.csproj | 9 +--- src/Ev.ServiceBus/Dispatch/DispatchSender.cs | 2 +- .../Dispatch/IServiceBusMessageSender.cs | 17 +++++++ .../Dispatch/Outbox/IOutboxRepository.cs | 12 ----- .../Dispatch/Outbox/IOutboxService.cs | 13 +++++ .../Dispatch/Outbox/OutboxDispatchSender.cs | 12 ++--- .../Outbox/OutboxMessageDispatcher.cs | 11 ++++- .../Dispatch/ServiceBusMessageSender.cs | 49 ++++++++++--------- src/Ev.ServiceBus/Ev.ServiceBus.csproj | 12 ++--- src/Ev.ServiceBus/ServiceBusBuilder.cs | 4 +- 11 files changed, 83 insertions(+), 62 deletions(-) create mode 100644 src/Ev.ServiceBus/Dispatch/IServiceBusMessageSender.cs delete mode 100644 src/Ev.ServiceBus/Dispatch/Outbox/IOutboxRepository.cs create mode 100644 src/Ev.ServiceBus/Dispatch/Outbox/IOutboxService.cs diff --git a/docs/CHANGELOG.md b/docs/CHANGELOG.md index be0d483..27e9745 100644 --- a/docs/CHANGELOG.md +++ b/docs/CHANGELOG.md @@ -5,7 +5,9 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). ## 5.5.0 -- Added Outbox pattern support +- Added Outbox pattern integration +- Changed + - Optimized sending of messages by sending batches in parallel ## 5.4.0 - Added Isolation feature V2 diff --git a/src/Ev.ServiceBus.Abstractions/Ev.ServiceBus.Abstractions.csproj b/src/Ev.ServiceBus.Abstractions/Ev.ServiceBus.Abstractions.csproj index 0741c8d..a3fc703 100644 --- a/src/Ev.ServiceBus.Abstractions/Ev.ServiceBus.Abstractions.csproj +++ b/src/Ev.ServiceBus.Abstractions/Ev.ServiceBus.Abstractions.csproj @@ -1,7 +1,7 @@  - netstandard2.1;net8.0 + net8.0 true MIT @@ -15,12 +15,7 @@ - - - - - - + \ No newline at end of file diff --git a/src/Ev.ServiceBus/Dispatch/DispatchSender.cs b/src/Ev.ServiceBus/Dispatch/DispatchSender.cs index 1a12494..afdd22c 100644 --- a/src/Ev.ServiceBus/Dispatch/DispatchSender.cs +++ b/src/Ev.ServiceBus/Dispatch/DispatchSender.cs @@ -91,7 +91,7 @@ public async Task ScheduleDispatches(IEnumerable messageP var dispatches = _messageFactory.CreateMessagesToSend(messagePayloads); foreach (var messagesPerResource in dispatches) { - await _serviceBusMessageSender.ScheduleMessages(messagesPerResource, scheduledEnqueueTime, token); + await _serviceBusMessageSender.ScheduleMessages(messagesPerResource.ResourceId, messagesPerResource.Messages, scheduledEnqueueTime, token); } } } \ No newline at end of file diff --git a/src/Ev.ServiceBus/Dispatch/IServiceBusMessageSender.cs b/src/Ev.ServiceBus/Dispatch/IServiceBusMessageSender.cs new file mode 100644 index 0000000..3ce9aa5 --- /dev/null +++ b/src/Ev.ServiceBus/Dispatch/IServiceBusMessageSender.cs @@ -0,0 +1,17 @@ +using System; +using System.Threading; +using System.Threading.Tasks; +using Azure.Messaging.ServiceBus; + +namespace Ev.ServiceBus.Dispatch; + +public interface IServiceBusMessageSender +{ + Task SendMessages(string resourceId, ServiceBusMessage[] messages, CancellationToken token); + + Task ScheduleMessages( + string resourceId, + ServiceBusMessage[] messages, + DateTimeOffset scheduledEnqueueTime, + CancellationToken token); +} \ No newline at end of file diff --git a/src/Ev.ServiceBus/Dispatch/Outbox/IOutboxRepository.cs b/src/Ev.ServiceBus/Dispatch/Outbox/IOutboxRepository.cs deleted file mode 100644 index 646574f..0000000 --- a/src/Ev.ServiceBus/Dispatch/Outbox/IOutboxRepository.cs +++ /dev/null @@ -1,12 +0,0 @@ -using System; -using System.Threading; -using System.Threading.Tasks; -using Azure.Messaging.ServiceBus; - -namespace Ev.ServiceBus.Dispatch.Outbox; - -public interface IOutboxRepository -{ - Task Add(string resourceId, ServiceBusMessage message, CancellationToken token); - Task AddScheduled(string resourceId, DateTimeOffset scheduledEnqueueTime, ServiceBusMessage message, CancellationToken token); -} diff --git a/src/Ev.ServiceBus/Dispatch/Outbox/IOutboxService.cs b/src/Ev.ServiceBus/Dispatch/Outbox/IOutboxService.cs new file mode 100644 index 0000000..70b40de --- /dev/null +++ b/src/Ev.ServiceBus/Dispatch/Outbox/IOutboxService.cs @@ -0,0 +1,13 @@ +using System; +using System.Threading; +using System.Threading.Tasks; +using Azure.Messaging.ServiceBus; + +namespace Ev.ServiceBus.Dispatch.Outbox; + +public interface IOutboxService +{ + Task StoreMessage(string resourceId, ServiceBusMessage message, CancellationToken token); + Task StoreScheduledMessage(string resourceId, DateTimeOffset scheduledEnqueueTime, ServiceBusMessage message, CancellationToken token); + Task EagerlySendStoredMessages(CancellationToken token); +} diff --git a/src/Ev.ServiceBus/Dispatch/Outbox/OutboxDispatchSender.cs b/src/Ev.ServiceBus/Dispatch/Outbox/OutboxDispatchSender.cs index 391fd32..8a8b9a0 100644 --- a/src/Ev.ServiceBus/Dispatch/Outbox/OutboxDispatchSender.cs +++ b/src/Ev.ServiceBus/Dispatch/Outbox/OutboxDispatchSender.cs @@ -10,16 +10,16 @@ namespace Ev.ServiceBus.Dispatch.Outbox; public class OutboxDispatchSender : IDispatchSender { private readonly IDispatchSender _underlyingDispatchSender; - private readonly IOutboxRepository _outboxRepository; + private readonly IOutboxService _outboxService; private readonly ServiceBusMessageFactory _messageFactory; public OutboxDispatchSender( IDispatchSender underlyingDispatchSender, - IOutboxRepository outboxRepository, + IOutboxService outboxService, ServiceBusMessageFactory serviceBusMessageFactory) { _underlyingDispatchSender = underlyingDispatchSender; - _outboxRepository = outboxRepository; + _outboxService = outboxService; _messageFactory = serviceBusMessageFactory; } @@ -36,7 +36,7 @@ public async Task SendDispatch(Abstractions.Dispatch messagePayload, Cancellatio { foreach (var message in messagesPerResource.Messages) { - await _outboxRepository.Add(messagesPerResource.ResourceId, message, token); + await _outboxService.StoreMessage(messagesPerResource.ResourceId, message, token); } } } @@ -60,7 +60,7 @@ public async Task SendDispatches(IEnumerable messagePaylo { foreach (var message in messagesPerResource.Messages) { - await _outboxRepository.Add(messagesPerResource.ResourceId, message, token); + await _outboxService.StoreMessage(messagesPerResource.ResourceId, message, token); } } } @@ -88,7 +88,7 @@ public async Task ScheduleDispatches( { foreach (var message in messagesPerResource.Messages) { - await _outboxRepository.AddScheduled(messagesPerResource.ResourceId, scheduledEnqueueTime, message, token); + await _outboxService.StoreScheduledMessage(messagesPerResource.ResourceId, scheduledEnqueueTime, message, token); } } } diff --git a/src/Ev.ServiceBus/Dispatch/Outbox/OutboxMessageDispatcher.cs b/src/Ev.ServiceBus/Dispatch/Outbox/OutboxMessageDispatcher.cs index 2f82203..f641fd8 100644 --- a/src/Ev.ServiceBus/Dispatch/Outbox/OutboxMessageDispatcher.cs +++ b/src/Ev.ServiceBus/Dispatch/Outbox/OutboxMessageDispatcher.cs @@ -6,8 +6,15 @@ namespace Ev.ServiceBus.Dispatch.Outbox; public class OutboxMessageDispatcher : IMessageDispatcher { - public Task ExecuteDispatches(CancellationToken token) + private readonly IOutboxService _service; + + public OutboxMessageDispatcher(IOutboxService service) + { + _service = service; + } + + public async Task ExecuteDispatches(CancellationToken token) { - return Task.CompletedTask; + await _service.EagerlySendStoredMessages(token); } } diff --git a/src/Ev.ServiceBus/Dispatch/ServiceBusMessageSender.cs b/src/Ev.ServiceBus/Dispatch/ServiceBusMessageSender.cs index 8a017b3..04bd8ca 100644 --- a/src/Ev.ServiceBus/Dispatch/ServiceBusMessageSender.cs +++ b/src/Ev.ServiceBus/Dispatch/ServiceBusMessageSender.cs @@ -10,7 +10,7 @@ namespace Ev.ServiceBus.Dispatch; -public class ServiceBusMessageSender +public class ServiceBusMessageSender : IServiceBusMessageSender { private const int MaxMessagePerSend = 100; private readonly ServiceBusRegistry _registry; @@ -25,11 +25,13 @@ public async Task SendMessages(string resourceId, ServiceBusMessage[] messages, var sender = _registry.GetMessageSender(resourceId); var batches = await GetBatches(sender, messages, token); - foreach (var batch in batches) - { - await sender.SendMessagesAsync(batch, token); - batch.Dispose(); - } + await Parallel.ForEachAsync(batches, + new ParallelOptions { CancellationToken = token }, + async (batch, ct) => + { + await sender.SendMessagesAsync(batch, ct); + batch.Dispose(); + }); } private async Task GetBatches( @@ -67,13 +69,14 @@ private async Task GetBatches( } public async Task ScheduleMessages( - MessagesPerResource messagesPerResource, + string resourceId, + ServiceBusMessage[] messages, DateTimeOffset scheduledEnqueueTime, CancellationToken token) { - var sender = _registry.GetMessageSender(messagesPerResource.ResourceId); + var sender = _registry.GetMessageSender(resourceId); - var pages = messagesPerResource.Messages + var pages = messages .Select((x, i) => new { Item = x, @@ -83,18 +86,20 @@ public async Task ScheduleMessages( .Select(o => o.ToArray()) .ToArray(); - foreach (var page in pages) - { - foreach (var message in page) + await Parallel.ForEachAsync(pages, + new ParallelOptions { CancellationToken = token }, + async (page, ct) => { - ServiceBusMeter.IncrementSentCounter( - 1, - sender.ClientType.ToString(), - sender.Name, - message.ApplicationProperties[UserProperties.PayloadTypeIdProperty]?.ToString() - ); - } - await sender.ScheduleMessagesAsync(page, scheduledEnqueueTime, token); - } + foreach (var message in page) + { + ServiceBusMeter.IncrementSentCounter( + 1, + sender.ClientType.ToString(), + sender.Name, + message.ApplicationProperties[UserProperties.PayloadTypeIdProperty]?.ToString() + ); + } + await sender.ScheduleMessagesAsync(page, scheduledEnqueueTime, ct); + }); } -} +} \ No newline at end of file diff --git a/src/Ev.ServiceBus/Ev.ServiceBus.csproj b/src/Ev.ServiceBus/Ev.ServiceBus.csproj index 0ca97c8..473bd5b 100644 --- a/src/Ev.ServiceBus/Ev.ServiceBus.csproj +++ b/src/Ev.ServiceBus/Ev.ServiceBus.csproj @@ -1,7 +1,7 @@  - netstandard2.1;net8.0 + net8.0 true MIT This is a wrapper around Microsoft Azure Service Bus @@ -15,18 +15,12 @@ Its goal to is make it the easiest possible to connect and handle an Azure Servi - - - - - - - - + + diff --git a/src/Ev.ServiceBus/ServiceBusBuilder.cs b/src/Ev.ServiceBus/ServiceBusBuilder.cs index 099068c..5f57c5c 100644 --- a/src/Ev.ServiceBus/ServiceBusBuilder.cs +++ b/src/Ev.ServiceBus/ServiceBusBuilder.cs @@ -52,12 +52,12 @@ public ServiceBusBuilder WithPayloadSerializer() } public ServiceBusBuilder WithOutboxIntegration() - where TOutboxRepository : class, IOutboxRepository + where TOutboxRepository : class, IOutboxService { Services.Decorate(); Services.Decorate(); Services.Decorate(); - Services.AddScoped(); + Services.AddScoped(); return this; } } \ No newline at end of file