diff --git a/docs/CHANGELOG.md b/docs/CHANGELOG.md index 933eb6e..27e9745 100644 --- a/docs/CHANGELOG.md +++ b/docs/CHANGELOG.md @@ -4,6 +4,14 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). +## 5.5.0 +- Added Outbox pattern integration +- Changed + - Optimized sending of messages by sending batches in parallel + +## 5.4.0 +- Added Isolation feature V2 + ## 5.3.1 - Changed - Fix usage of sent counter on receiver instead of received counter diff --git a/src/Ev.ServiceBus.Abstractions/Ev.ServiceBus.Abstractions.csproj b/src/Ev.ServiceBus.Abstractions/Ev.ServiceBus.Abstractions.csproj index 0741c8d..a3fc703 100644 --- a/src/Ev.ServiceBus.Abstractions/Ev.ServiceBus.Abstractions.csproj +++ b/src/Ev.ServiceBus.Abstractions/Ev.ServiceBus.Abstractions.csproj @@ -1,7 +1,7 @@  - netstandard2.1;net8.0 + net8.0 true MIT @@ -15,12 +15,7 @@ - - - - - - + \ No newline at end of file diff --git a/src/Ev.ServiceBus.Abstractions/Exceptions/SenderNotFoundException.cs b/src/Ev.ServiceBus.Abstractions/Exceptions/SenderNotFoundException.cs index 6458e77..5419593 100644 --- a/src/Ev.ServiceBus.Abstractions/Exceptions/SenderNotFoundException.cs +++ b/src/Ev.ServiceBus.Abstractions/Exceptions/SenderNotFoundException.cs @@ -5,13 +5,13 @@ namespace Ev.ServiceBus.Abstractions; [Serializable] public class SenderNotFoundException : Exception { - public SenderNotFoundException(ClientType clientType, string topicName) + public SenderNotFoundException(string resourceId) : base( - $"The {clientType.ToString()} '{topicName}' you tried to retrieve was not found. " - + $"Verify your configuration to make sure the {clientType.ToString()} is properly registered.") + $"The '{resourceId}' you tried to retrieve was not found. " + + $"Verify your configuration to make sure the resource is properly registered.") { - TopicName = topicName; + ResourceId = resourceId; } - public string TopicName { get; } + public string ResourceId { get; } } \ No newline at end of file diff --git a/src/Ev.ServiceBus.Abstractions/IMessagePublisher.cs b/src/Ev.ServiceBus.Abstractions/IMessagePublisher.cs index 37f7aac..6ec1f3c 100644 --- a/src/Ev.ServiceBus.Abstractions/IMessagePublisher.cs +++ b/src/Ev.ServiceBus.Abstractions/IMessagePublisher.cs @@ -1,4 +1,5 @@ using System; +using System.Threading.Tasks; namespace Ev.ServiceBus.Abstractions; @@ -9,7 +10,7 @@ public interface IMessagePublisher /// /// The object to send through Service Bus /// A type of object that is registered within Ev.ServiceBus - void Publish(TMessagePayload messageDto); + Task Publish(TMessagePayload messageDto); /// /// Temporarily stores the object to send through Service Bus until is called. @@ -17,7 +18,7 @@ public interface IMessagePublisher /// The object to send through Service Bus /// The sessionId to attach to the outgoing message /// A type of object that is registered within Ev.ServiceBus - void Publish(TMessagePayload messageDto, string sessionId); + Task Publish(TMessagePayload messageDto, string sessionId); /// /// Temporarily stores the object to send through Service Bus until is called. @@ -25,5 +26,5 @@ public interface IMessagePublisher /// The object to send through Service Bus /// Configurator of message context /// A type of object that is registered within Ev.ServiceBus - void Publish(TMessagePayload messageDto, Action messageContextConfiguration); + Task Publish(TMessagePayload messageDto, Action messageContextConfiguration); } \ No newline at end of file diff --git a/src/Ev.ServiceBus.Abstractions/IsExternalInit.cs b/src/Ev.ServiceBus.Abstractions/IsExternalInit.cs new file mode 100644 index 0000000..37f9b76 --- /dev/null +++ b/src/Ev.ServiceBus.Abstractions/IsExternalInit.cs @@ -0,0 +1,11 @@ +using System.ComponentModel; + +namespace System.Runtime.CompilerServices; + +/// +/// Reserved to be used by the compiler for tracking metadata. +/// This class should not be used by developers in source code. +/// +[EditorBrowsable(EditorBrowsableState.Never)] +internal static class IsExternalInit { +} diff --git a/src/Ev.ServiceBus/Dispatch/DispatchSender.cs b/src/Ev.ServiceBus/Dispatch/DispatchSender.cs index 480402a..afdd22c 100644 --- a/src/Ev.ServiceBus/Dispatch/DispatchSender.cs +++ b/src/Ev.ServiceBus/Dispatch/DispatchSender.cs @@ -3,40 +3,21 @@ using System.Linq; using System.Threading; using System.Threading.Tasks; -using Azure.Messaging.ServiceBus; using Ev.ServiceBus.Abstractions; -using Ev.ServiceBus.Abstractions.Extensions; -using Ev.ServiceBus.Abstractions.MessageReception; -using Ev.ServiceBus.Diagnostics; -using Ev.ServiceBus.Management; -using Microsoft.Extensions.Options; namespace Ev.ServiceBus.Dispatch; public class DispatchSender : IDispatchSender { - private const int MaxMessagePerSend = 100; - private readonly IMessagePayloadSerializer _messagePayloadSerializer; - private readonly ServiceBusRegistry _dispatchRegistry; - private readonly ServiceBusRegistry _registry; - private readonly IMessageMetadataAccessor _messageMetadataAccessor; - private readonly IEnumerable _dispatchCustomizers; - private readonly ServiceBusOptions _serviceBusOptions; + private readonly ServiceBusMessageFactory _messageFactory; + private readonly ServiceBusMessageSender _serviceBusMessageSender; public DispatchSender( - ServiceBusRegistry registry, - IMessagePayloadSerializer messagePayloadSerializer, - ServiceBusRegistry dispatchRegistry, - IMessageMetadataAccessor messageMetadataAccessor, - IEnumerable dispatchCustomizers, - IOptions serviceBusOptions) + ServiceBusMessageFactory messageFactory, + ServiceBusMessageSender serviceBusMessageSender) { - _registry = registry; - _messagePayloadSerializer = messagePayloadSerializer; - _dispatchRegistry = dispatchRegistry; - _messageMetadataAccessor = messageMetadataAccessor; - _dispatchCustomizers = dispatchCustomizers; - _serviceBusOptions = serviceBusOptions.Value; + _messageFactory = messageFactory; + _serviceBusMessageSender = serviceBusMessageSender; } /// @@ -50,20 +31,14 @@ public async Task SendDispatch(object messagePayload, CancellationToken token = /// public async Task SendDispatch(Abstractions.Dispatch messagePayload, CancellationToken token = default) { - var dispatches = CreateMessagesToSend([messagePayload]); + var dispatches = _messageFactory.CreateMessagesToSend([messagePayload]); - foreach (var messagePerResource in dispatches) - { - var message = messagePerResource.Messages.Single(); + var messagePerResource = dispatches.Single(); - await messagePerResource.Sender.SendMessageAsync(message.Message, token); - ServiceBusMeter.IncrementSentCounter( - 1, - messagePerResource.Sender.ClientType.ToString(), - messagePerResource.Sender.Name, - message.Message.ApplicationProperties[UserProperties.PayloadTypeIdProperty]?.ToString() - ); - } + await _serviceBusMessageSender.SendMessages( + messagePerResource.ResourceId, + messagePerResource.Messages, + token); } /// @@ -86,48 +61,10 @@ public async Task SendDispatches(IEnumerable messagePaylo throw new ArgumentNullException(nameof(messagePayloads)); } - var dispatches = CreateMessagesToSend(messagePayloads); + var dispatches = _messageFactory.CreateMessagesToSend(messagePayloads); foreach (var messagesPerResource in dispatches) { - await BatchAndSendMessages(messagesPerResource, token, async (sender, batch) => - { - await sender.SendMessagesAsync(batch, token); - }); - } - } - - private async Task BatchAndSendMessages(MessagesPerResource dispatches, CancellationToken token, Func senderAction) - { - var batches = new List(); - var batch = await dispatches.Sender.CreateMessageBatchAsync(token); - batches.Add(batch); - foreach (var messageToSend in dispatches.Messages) - { - ServiceBusMeter.IncrementSentCounter( - 1, - dispatches.Sender.ClientType.ToString(), - dispatches.Sender.Name, - messageToSend.Message.ApplicationProperties[UserProperties.PayloadTypeIdProperty]?.ToString() - ); - - if (batch.TryAddMessage(messageToSend.Message)) - { - continue; - } - batch = await dispatches.Sender.CreateMessageBatchAsync(token); - batches.Add(batch); - if (batch.TryAddMessage(messageToSend.Message)) - { - continue; - } - - throw new ArgumentOutOfRangeException("A message is too big to fit in a single batch"); - } - - foreach (var pageMessages in batches) - { - await senderAction.Invoke(dispatches.Sender, pageMessages); - pageMessages.Dispose(); + await _serviceBusMessageSender.SendMessages(messagesPerResource.ResourceId, messagesPerResource.Messages, token); } } @@ -151,132 +88,10 @@ public async Task ScheduleDispatches(IEnumerable messageP throw new ArgumentNullException(nameof(messagePayloads)); } - var dispatches = CreateMessagesToSend(messagePayloads); + var dispatches = _messageFactory.CreateMessagesToSend(messagePayloads); foreach (var messagesPerResource in dispatches) { - await PaginateAndSendMessages(messagesPerResource, async (sender, page) => - { - await sender.ScheduleMessagesAsync(page, scheduledEnqueueTime, token); - }); - } - } - - private async Task PaginateAndSendMessages(MessagesPerResource dispatches, Func, Task> senderAction) - { - var paginatedMessages = dispatches.Messages.Select(o => o.Message) - .Select((x, i) => new - { - Item = x, - Index = i - }) - .GroupBy(x => x.Index / MaxMessagePerSend, x => x.Item); - - foreach (var pageMessages in paginatedMessages) - { - foreach (var message in pageMessages) - { - ServiceBusMeter.IncrementSentCounter( - 1, - dispatches.Sender.ClientType.ToString(), - dispatches.Sender.Name, - message.ApplicationProperties[UserProperties.PayloadTypeIdProperty]?.ToString() - ); - } - - await senderAction.Invoke(dispatches.Sender, pageMessages.Select(m => m).ToArray()); - } - } - - private class MessagesPerResource - { - public MessageToSend[] Messages { get; set; } - public ClientType ClientType { get; set; } - public string ResourceId { get; set; } - public IMessageSender Sender { get; set; } - } - - private class MessageToSend - { - public MessageToSend(ServiceBusMessage message, MessageDispatchRegistration registration) - { - Message = message; - Registration = registration; - } - - public ServiceBusMessage Message { get; } - public MessageDispatchRegistration Registration { get; } - } - - private MessagesPerResource[] CreateMessagesToSend(IEnumerable messagePayloads) - { - var dispatches = - ( - from dispatch in messagePayloads - // the same dispatch can be published to several senders - let registrations = _dispatchRegistry.GetDispatchRegistrations(dispatch.Payload.GetType()) - from eventPublicationRegistration in registrations - let message = CreateMessage(eventPublicationRegistration, dispatch) - select new MessageToSend(message, eventPublicationRegistration) - ) - .ToArray(); - - var messagesPerResource = ( - from dispatch in dispatches - group dispatch by new { dispatch.Registration.Options.ClientType, dispatch.Registration.Options.ResourceId } into gr - let sender = _registry.GetMessageSender(gr.Key.ClientType, gr.Key.ResourceId) - select new MessagesPerResource() - { - Messages = gr.ToArray(), - ClientType = gr.Key.ClientType, - ResourceId = gr.Key.ResourceId, - Sender = sender - }).ToArray(); - - return messagesPerResource; - } - - private ServiceBusMessage CreateMessage( - MessageDispatchRegistration registration, - Abstractions.Dispatch dispatch) - { - var result = _messagePayloadSerializer.SerializeBody(dispatch.Payload); - var message = MessageHelper.CreateMessage(result.ContentType, result.Body, registration.PayloadTypeId); - - dispatch.ApplicationProperties.Remove(UserProperties.PayloadTypeIdProperty); - foreach (var dispatchApplicationProperty in dispatch.ApplicationProperties) - { - message.ApplicationProperties[dispatchApplicationProperty.Key] = dispatchApplicationProperty.Value; - } - - message.SessionId = dispatch.SessionId; - - var originalCorrelationId = _messageMetadataAccessor.Metadata?.CorrelationId ?? Guid.NewGuid().ToString(); - message.CorrelationId = dispatch.CorrelationId ?? originalCorrelationId; - - var originalIsolationKey = _messageMetadataAccessor.Metadata?.ApplicationProperties.GetIsolationKey(); - message.SetIsolationKey(originalIsolationKey ?? _serviceBusOptions.Settings.IsolationSettings.IsolationKey); - - var originalIsolationApps = _messageMetadataAccessor.Metadata?.ApplicationProperties.GetIsolationApps() ?? []; - message.SetIsolationApps(originalIsolationApps); - - if (dispatch.DiagnosticId != null) - { - message.SetDiagnosticIdIfIsNot(dispatch.DiagnosticId); - } - if (!string.IsNullOrWhiteSpace(dispatch.MessageId)) - { - message.MessageId = dispatch.MessageId; - } - - foreach (var customizer in registration.OutgoingMessageCustomizers) - { - customizer?.Invoke(message, dispatch.Payload); - } - - foreach (var dispatchCustomizer in _dispatchCustomizers) - { - dispatchCustomizer.ExtendDispatch(message, dispatch.Payload); + await _serviceBusMessageSender.ScheduleMessages(messagesPerResource.ResourceId, messagesPerResource.Messages, scheduledEnqueueTime, token); } - return message; } } \ No newline at end of file diff --git a/src/Ev.ServiceBus/Dispatch/IServiceBusMessageSender.cs b/src/Ev.ServiceBus/Dispatch/IServiceBusMessageSender.cs new file mode 100644 index 0000000..3ce9aa5 --- /dev/null +++ b/src/Ev.ServiceBus/Dispatch/IServiceBusMessageSender.cs @@ -0,0 +1,17 @@ +using System; +using System.Threading; +using System.Threading.Tasks; +using Azure.Messaging.ServiceBus; + +namespace Ev.ServiceBus.Dispatch; + +public interface IServiceBusMessageSender +{ + Task SendMessages(string resourceId, ServiceBusMessage[] messages, CancellationToken token); + + Task ScheduleMessages( + string resourceId, + ServiceBusMessage[] messages, + DateTimeOffset scheduledEnqueueTime, + CancellationToken token); +} \ No newline at end of file diff --git a/src/Ev.ServiceBus/Dispatch/MessageDispatcher.cs b/src/Ev.ServiceBus/Dispatch/MessageDispatcher.cs index 89552a3..0bb5796 100644 --- a/src/Ev.ServiceBus/Dispatch/MessageDispatcher.cs +++ b/src/Ev.ServiceBus/Dispatch/MessageDispatcher.cs @@ -31,7 +31,7 @@ public async Task ExecuteDispatches(CancellationToken token) } /// - public void Publish(TMessageDto messageDto) + public Task Publish(TMessageDto messageDto) { if (messageDto == null) { @@ -42,10 +42,12 @@ public void Publish(TMessageDto messageDto) { DiagnosticId = Activity.Current?.Id }); + + return Task.CompletedTask; } /// - public void Publish(TMessagePayload messageDto, string sessionId) + public Task Publish(TMessagePayload messageDto, string sessionId) { if (messageDto == null) { @@ -62,10 +64,12 @@ public void Publish(TMessagePayload messageDto, string sessionI SessionId = sessionId, DiagnosticId = Activity.Current?.Id }); + + return Task.CompletedTask; } /// - public void Publish( + public Task Publish( TMessagePayload messageDto, Action messageContextConfiguration) { @@ -90,5 +94,7 @@ public void Publish( MessageId = context.MessageId, DiagnosticId = context.DiagnosticId ?? Activity.Current?.Id }); + + return Task.CompletedTask; } } \ No newline at end of file diff --git a/src/Ev.ServiceBus/Dispatch/Outbox/IOutboxService.cs b/src/Ev.ServiceBus/Dispatch/Outbox/IOutboxService.cs new file mode 100644 index 0000000..70b40de --- /dev/null +++ b/src/Ev.ServiceBus/Dispatch/Outbox/IOutboxService.cs @@ -0,0 +1,13 @@ +using System; +using System.Threading; +using System.Threading.Tasks; +using Azure.Messaging.ServiceBus; + +namespace Ev.ServiceBus.Dispatch.Outbox; + +public interface IOutboxService +{ + Task StoreMessage(string resourceId, ServiceBusMessage message, CancellationToken token); + Task StoreScheduledMessage(string resourceId, DateTimeOffset scheduledEnqueueTime, ServiceBusMessage message, CancellationToken token); + Task EagerlySendStoredMessages(CancellationToken token); +} diff --git a/src/Ev.ServiceBus/Dispatch/Outbox/OutboxDispatchSender.cs b/src/Ev.ServiceBus/Dispatch/Outbox/OutboxDispatchSender.cs new file mode 100644 index 0000000..8a8b9a0 --- /dev/null +++ b/src/Ev.ServiceBus/Dispatch/Outbox/OutboxDispatchSender.cs @@ -0,0 +1,95 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Threading; +using System.Threading.Tasks; +using Ev.ServiceBus.Abstractions; + +namespace Ev.ServiceBus.Dispatch.Outbox; + +public class OutboxDispatchSender : IDispatchSender +{ + private readonly IDispatchSender _underlyingDispatchSender; + private readonly IOutboxService _outboxService; + private readonly ServiceBusMessageFactory _messageFactory; + + public OutboxDispatchSender( + IDispatchSender underlyingDispatchSender, + IOutboxService outboxService, + ServiceBusMessageFactory serviceBusMessageFactory) + { + _underlyingDispatchSender = underlyingDispatchSender; + _outboxService = outboxService; + _messageFactory = serviceBusMessageFactory; + } + + public async Task SendDispatch(object messagePayload, CancellationToken token = default) + { + await SendDispatch(new Abstractions.Dispatch(messagePayload), token); + } + + public async Task SendDispatch(Abstractions.Dispatch messagePayload, CancellationToken token = default) + { + var messagesPerResources = _messageFactory.CreateMessagesToSend([messagePayload]); + + foreach (var messagesPerResource in messagesPerResources) + { + foreach (var message in messagesPerResource.Messages) + { + await _outboxService.StoreMessage(messagesPerResource.ResourceId, message, token); + } + } + } + + public async Task SendDispatches(IEnumerable messagePayloads, CancellationToken token = default) + { + if (messagePayloads == null) + { + throw new ArgumentNullException(nameof(messagePayloads)); + } + + var dispatches = messagePayloads.Select(o => new Abstractions.Dispatch(o)).ToArray(); + await SendDispatches(dispatches, token); + } + + public async Task SendDispatches(IEnumerable messagePayloads, CancellationToken token = default) + { + var messagesPerResources = _messageFactory.CreateMessagesToSend(messagePayloads); + + foreach (var messagesPerResource in messagesPerResources) + { + foreach (var message in messagesPerResource.Messages) + { + await _outboxService.StoreMessage(messagesPerResource.ResourceId, message, token); + } + } + } + + public async Task ScheduleDispatches(IEnumerable messagePayloads, DateTimeOffset scheduledEnqueueTime, + CancellationToken token = default) + { + if (messagePayloads == null) + { + throw new ArgumentNullException(nameof(messagePayloads)); + } + + var dispatches = messagePayloads.Select(o => new Abstractions.Dispatch(o)).ToArray(); + await ScheduleDispatches(dispatches, scheduledEnqueueTime, token); + } + + public async Task ScheduleDispatches( + IEnumerable messagePayloads, + DateTimeOffset scheduledEnqueueTime, + CancellationToken token = default) + { + var messagesPerResources = _messageFactory.CreateMessagesToSend(messagePayloads); + + foreach (var messagesPerResource in messagesPerResources) + { + foreach (var message in messagesPerResource.Messages) + { + await _outboxService.StoreScheduledMessage(messagesPerResource.ResourceId, scheduledEnqueueTime, message, token); + } + } + } +} \ No newline at end of file diff --git a/src/Ev.ServiceBus/Dispatch/Outbox/OutboxMessageDispatcher.cs b/src/Ev.ServiceBus/Dispatch/Outbox/OutboxMessageDispatcher.cs new file mode 100644 index 0000000..f641fd8 --- /dev/null +++ b/src/Ev.ServiceBus/Dispatch/Outbox/OutboxMessageDispatcher.cs @@ -0,0 +1,20 @@ +using System.Threading; +using System.Threading.Tasks; +using Ev.ServiceBus.Abstractions; + +namespace Ev.ServiceBus.Dispatch.Outbox; + +public class OutboxMessageDispatcher : IMessageDispatcher +{ + private readonly IOutboxService _service; + + public OutboxMessageDispatcher(IOutboxService service) + { + _service = service; + } + + public async Task ExecuteDispatches(CancellationToken token) + { + await _service.EagerlySendStoredMessages(token); + } +} diff --git a/src/Ev.ServiceBus/Dispatch/Outbox/OutboxMessagePublisher.cs b/src/Ev.ServiceBus/Dispatch/Outbox/OutboxMessagePublisher.cs new file mode 100644 index 0000000..8732160 --- /dev/null +++ b/src/Ev.ServiceBus/Dispatch/Outbox/OutboxMessagePublisher.cs @@ -0,0 +1,75 @@ +using System; +using System.Diagnostics; +using System.Threading.Tasks; +using Ev.ServiceBus.Abstractions; + +namespace Ev.ServiceBus.Dispatch.Outbox; + +public class OutboxMessagePublisher : IMessagePublisher +{ + private readonly IMessagePublisher _underlyingMessagePublisher; + private readonly IDispatchSender _dispatchSender; + + public OutboxMessagePublisher( + IMessagePublisher underlyingMessagePublisher, + IDispatchSender dispatchSender) + { + _underlyingMessagePublisher = underlyingMessagePublisher; + _dispatchSender = dispatchSender; + } + + public async Task Publish(TMessagePayload messageDto) + { + if (messageDto == null) + { + throw new ArgumentNullException(nameof(messageDto)); + } + + await _dispatchSender.SendDispatch(messageDto!); + } + + public async Task Publish(TMessagePayload messageDto, string sessionId) + { + if (messageDto == null) + { + throw new ArgumentNullException(nameof(messageDto)); + } + + if (sessionId == null) + { + throw new ArgumentNullException(nameof(sessionId)); + } + + await _dispatchSender.SendDispatch(new Abstractions.Dispatch(messageDto) + { + SessionId = sessionId, + DiagnosticId = Activity.Current?.Id + }); + } + + public async Task Publish(TMessagePayload messageDto, Action messageContextConfiguration) + { + if (messageDto == null) + { + throw new ArgumentNullException(nameof(messageDto)); + } + + if (messageContextConfiguration == null) + { + throw new ArgumentNullException(nameof(messageContextConfiguration)); + } + + var context = new DispatchContext(); + + messageContextConfiguration.Invoke(context); + + var dispatch = new Abstractions.Dispatch(messageDto, context) + { + SessionId = context.SessionId, + CorrelationId = context.CorrelationId, + MessageId = context.MessageId, + DiagnosticId = context.DiagnosticId ?? Activity.Current?.Id + }; + await _dispatchSender.SendDispatch(dispatch); + } +} diff --git a/src/Ev.ServiceBus/Dispatch/ServiceBusMessageFactory.cs b/src/Ev.ServiceBus/Dispatch/ServiceBusMessageFactory.cs new file mode 100644 index 0000000..21f77c5 --- /dev/null +++ b/src/Ev.ServiceBus/Dispatch/ServiceBusMessageFactory.cs @@ -0,0 +1,106 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using Azure.Messaging.ServiceBus; +using Ev.ServiceBus.Abstractions; +using Ev.ServiceBus.Abstractions.Extensions; +using Ev.ServiceBus.Abstractions.MessageReception; +using Ev.ServiceBus.Management; +using Microsoft.Extensions.Options; + +namespace Ev.ServiceBus.Dispatch; + +public record MessagesPerResource(ServiceBusMessage[] Messages, string ResourceId); + +public class ServiceBusMessageFactory +{ + private readonly ServiceBusRegistry _registry; + private readonly IMessagePayloadSerializer _messagePayloadSerializer; + private readonly IMessageMetadataAccessor _messageMetadataAccessor; + private readonly IEnumerable _dispatchCustomizers; + private readonly ServiceBusOptions _serviceBusOptions; + + public ServiceBusMessageFactory( + ServiceBusRegistry serviceBusRegistry, + IMessagePayloadSerializer messagePayloadSerializer, + IMessageMetadataAccessor messageMetadataAccessor, + IEnumerable dispatchCustomizers, + IOptions serviceBusOptions + ) + { + _registry = serviceBusRegistry; + _messagePayloadSerializer = messagePayloadSerializer; + _messageMetadataAccessor = messageMetadataAccessor; + _dispatchCustomizers = dispatchCustomizers; + _serviceBusOptions = serviceBusOptions.Value; + } + + private record MessageToSend(ServiceBusMessage Message, string ResourceId); + + public MessagesPerResource[] CreateMessagesToSend(IEnumerable messagePayloads) + { + var dispatches = + ( + from dispatch in messagePayloads + // the same dispatch can be published to several senders + let registrations = _registry.GetDispatchRegistrations(dispatch.Payload.GetType()) + from eventPublicationRegistration in registrations + let message = CreateMessage(eventPublicationRegistration, dispatch) + select new MessageToSend(message, eventPublicationRegistration.Options.ResourceId) + ) + .ToArray(); + + var messagesPerResource = ( + from dispatch in dispatches + group dispatch by dispatch.ResourceId into gr + select new MessagesPerResource(gr.Select(o => o.Message).ToArray(), gr.Key)) + .ToArray(); + + return messagesPerResource; + } + + private ServiceBusMessage CreateMessage( + MessageDispatchRegistration registration, + Abstractions.Dispatch dispatch) + { + var result = _messagePayloadSerializer.SerializeBody(dispatch.Payload); + var message = MessageHelper.CreateMessage(result.ContentType, result.Body, registration.PayloadTypeId); + + dispatch.ApplicationProperties.Remove(UserProperties.PayloadTypeIdProperty); + foreach (var dispatchApplicationProperty in dispatch.ApplicationProperties) + { + message.ApplicationProperties[dispatchApplicationProperty.Key] = dispatchApplicationProperty.Value; + } + + message.SessionId = dispatch.SessionId; + + var originalCorrelationId = _messageMetadataAccessor.Metadata?.CorrelationId ?? Guid.NewGuid().ToString(); + message.CorrelationId = dispatch.CorrelationId ?? originalCorrelationId; + + var originalIsolationKey = _messageMetadataAccessor.Metadata?.ApplicationProperties.GetIsolationKey(); + message.SetIsolationKey(originalIsolationKey ?? _serviceBusOptions.Settings.IsolationSettings.IsolationKey); + + var originalIsolationApps = _messageMetadataAccessor.Metadata?.ApplicationProperties.GetIsolationApps() ?? []; + message.SetIsolationApps(originalIsolationApps); + + if (dispatch.DiagnosticId != null) + { + message.SetDiagnosticIdIfIsNot(dispatch.DiagnosticId); + } + if (!string.IsNullOrWhiteSpace(dispatch.MessageId)) + { + message.MessageId = dispatch.MessageId; + } + + foreach (var customizer in registration.OutgoingMessageCustomizers) + { + customizer?.Invoke(message, dispatch.Payload); + } + + foreach (var dispatchCustomizer in _dispatchCustomizers) + { + dispatchCustomizer.ExtendDispatch(message, dispatch.Payload); + } + return message; + } +} diff --git a/src/Ev.ServiceBus/Dispatch/ServiceBusMessageSender.cs b/src/Ev.ServiceBus/Dispatch/ServiceBusMessageSender.cs new file mode 100644 index 0000000..04bd8ca --- /dev/null +++ b/src/Ev.ServiceBus/Dispatch/ServiceBusMessageSender.cs @@ -0,0 +1,105 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Threading; +using System.Threading.Tasks; +using Azure.Messaging.ServiceBus; +using Ev.ServiceBus.Abstractions; +using Ev.ServiceBus.Diagnostics; +using Ev.ServiceBus.Management; + +namespace Ev.ServiceBus.Dispatch; + +public class ServiceBusMessageSender : IServiceBusMessageSender +{ + private const int MaxMessagePerSend = 100; + private readonly ServiceBusRegistry _registry; + + public ServiceBusMessageSender(ServiceBusRegistry registry) + { + _registry = registry; + } + + public async Task SendMessages(string resourceId, ServiceBusMessage[] messages, CancellationToken token) + { + var sender = _registry.GetMessageSender(resourceId); + var batches = await GetBatches(sender, messages, token); + + await Parallel.ForEachAsync(batches, + new ParallelOptions { CancellationToken = token }, + async (batch, ct) => + { + await sender.SendMessagesAsync(batch, ct); + batch.Dispose(); + }); + } + + private async Task GetBatches( + IMessageSender sender, + ServiceBusMessage[] messages, + CancellationToken token) + { + var batches = new List(); + var batch = await sender.CreateMessageBatchAsync(token); + batches.Add(batch); + foreach (var message in messages) + { + ServiceBusMeter.IncrementSentCounter( + 1, + sender.ClientType.ToString(), + sender.Name, + message.ApplicationProperties[UserProperties.PayloadTypeIdProperty]?.ToString() + ); + + if (batch.TryAddMessage(message)) + { + continue; + } + batch = await sender.CreateMessageBatchAsync(token); + batches.Add(batch); + if (batch.TryAddMessage(message)) + { + continue; + } + + throw new ArgumentOutOfRangeException("A message is too big to fit in a single batch"); + } + + return batches.ToArray(); + } + + public async Task ScheduleMessages( + string resourceId, + ServiceBusMessage[] messages, + DateTimeOffset scheduledEnqueueTime, + CancellationToken token) + { + var sender = _registry.GetMessageSender(resourceId); + + var pages = messages + .Select((x, i) => new + { + Item = x, + Index = i + }) + .GroupBy(x => x.Index / MaxMessagePerSend, x => x.Item) + .Select(o => o.ToArray()) + .ToArray(); + + await Parallel.ForEachAsync(pages, + new ParallelOptions { CancellationToken = token }, + async (page, ct) => + { + foreach (var message in page) + { + ServiceBusMeter.IncrementSentCounter( + 1, + sender.ClientType.ToString(), + sender.Name, + message.ApplicationProperties[UserProperties.PayloadTypeIdProperty]?.ToString() + ); + } + await sender.ScheduleMessagesAsync(page, scheduledEnqueueTime, ct); + }); + } +} \ No newline at end of file diff --git a/src/Ev.ServiceBus/Ev.ServiceBus.csproj b/src/Ev.ServiceBus/Ev.ServiceBus.csproj index eba7db4..473bd5b 100644 --- a/src/Ev.ServiceBus/Ev.ServiceBus.csproj +++ b/src/Ev.ServiceBus/Ev.ServiceBus.csproj @@ -1,7 +1,7 @@  - netstandard2.1;net8.0 + net8.0 true MIT This is a wrapper around Microsoft Azure Service Bus @@ -14,18 +14,13 @@ Its goal to is make it the easiest possible to connect and handle an Azure Servi - - - - - - - - + + + diff --git a/src/Ev.ServiceBus/Isolation/IsolationService.cs b/src/Ev.ServiceBus/Isolation/IsolationService.cs index cafce2f..d724daa 100644 --- a/src/Ev.ServiceBus/Isolation/IsolationService.cs +++ b/src/Ev.ServiceBus/Isolation/IsolationService.cs @@ -89,7 +89,7 @@ private async Task SendToSourceAsync( var senderInfo = GetSenderResourceId(messageContext); // Try to get existing sender - var sender = _registry.TryGetMessageSender(senderInfo.ClientType, senderInfo.ResourceId); + var sender = _registry.TryGetMessageSender(senderInfo.ResourceId); if (sender != null) { await sender.SendMessageAsync(message, messageContext.CancellationToken); diff --git a/src/Ev.ServiceBus/Management/Factories/MessageSenderFactory.cs b/src/Ev.ServiceBus/Management/Factories/MessageSenderFactory.cs index d38f200..df6164b 100644 --- a/src/Ev.ServiceBus/Management/Factories/MessageSenderFactory.cs +++ b/src/Ev.ServiceBus/Management/Factories/MessageSenderFactory.cs @@ -30,9 +30,9 @@ public MessageSenderFactory( public IMessageSender CreateSender(ClientOptions[] senderOptions) { var options = (IClientOptions)senderOptions.First(); - if (_registry.IsSenderResourceIdTaken(options.ClientType, options.ResourceId)) + if (_registry.IsSenderResourceIdTaken(options.ResourceId)) { - var resourceId = GetNewSenderResourceId(options.ClientType, options.ResourceId); + var resourceId = GetNewSenderResourceId(options.ResourceId); foreach (var sender in senderOptions) { sender.UpdateResourceId(resourceId); @@ -56,7 +56,7 @@ public IMessageSender CreateSender(ClientOptions[] senderOptions) var client = _registry.CreateOrGetServiceBusClient(connectionSettings); var senderClient = client!.CreateSender(options.ResourceId); - _registry.Register(options.ClientType, options.ResourceId, senderClient); + _registry.Register(options.ResourceId, senderClient); var messageSender = new MessageSender(senderClient, options.ResourceId, options.ClientType, _provider.GetRequiredService>()); @@ -70,11 +70,11 @@ public IMessageSender CreateSender(ClientOptions[] senderOptions) } } - private string GetNewSenderResourceId(ClientType clientType, string resourceId) + private string GetNewSenderResourceId(string resourceId) { var newResourceId = resourceId; var suffix = 2; - while (_registry.IsSenderResourceIdTaken(clientType, newResourceId)) + while (_registry.IsSenderResourceIdTaken(newResourceId)) { newResourceId = $"{resourceId}_{suffix}"; ++suffix; diff --git a/src/Ev.ServiceBus/Management/ServiceBusRegistry.cs b/src/Ev.ServiceBus/Management/ServiceBusRegistry.cs index ebfe19a..01e966a 100644 --- a/src/Ev.ServiceBus/Management/ServiceBusRegistry.cs +++ b/src/Ev.ServiceBus/Management/ServiceBusRegistry.cs @@ -50,19 +50,19 @@ public ServiceBusRegistry( return client; } - public IMessageSender? TryGetMessageSender(ClientType clientType, string resourceId) + public IMessageSender? TryGetMessageSender(string resourceId) { - return _messageSenders.GetValueOrDefault(ComputeResourceKey(clientType, resourceId)); + return _messageSenders.GetValueOrDefault(resourceId); } - public IMessageSender GetMessageSender(ClientType clientType, string resourceId) + public IMessageSender GetMessageSender(string resourceId) { - if (_messageSenders.TryGetValue(ComputeResourceKey(clientType, resourceId), out var sender)) + if (_messageSenders.TryGetValue(resourceId, out var sender)) { return sender; } - throw new SenderNotFoundException(clientType, resourceId); + throw new SenderNotFoundException(resourceId); } internal ServiceBusSender[] GetAllSenderClients() @@ -87,7 +87,7 @@ private string ComputeReceptionKey(string payloadTypeId, string receiverName, Cl internal void Register(IMessageSender sender) { - _messageSenders.Add(ComputeResourceKey(sender.ClientType, sender.Name), sender); + _messageSenders.Add(sender.Name, sender); } internal void Register(MessageReceptionRegistration reception) @@ -100,9 +100,9 @@ internal void Register(Type dispatchType, MessageDispatchRegistration[] dispatch _dispatches.Add(dispatchType, dispatches); } - public void Register(ClientType clientType, string resourceId, ServiceBusSender senderClient) + public void Register(string resourceId, ServiceBusSender senderClient) { - _senderClients.Add(ComputeResourceKey(clientType, resourceId), senderClient); + _senderClients.Add(resourceId, senderClient); } public void Register(ClientType clientType, string resourceId, ReceiverWrapper receiverWrapper) @@ -137,9 +137,9 @@ public MessageReceptionRegistration[] GetReceptionRegistrations() return _receptions.Values.ToArray(); } - internal bool IsSenderResourceIdTaken(ClientType clientType, string resourceId) + internal bool IsSenderResourceIdTaken(string resourceId) { - return _senderClients.ContainsKey(ComputeResourceKey(clientType, resourceId)); + return _senderClients.ContainsKey(resourceId); } internal bool IsReceiverResourceIdTaken(ClientType clientType, string resourceId) diff --git a/src/Ev.ServiceBus/ServiceBusBuilder.cs b/src/Ev.ServiceBus/ServiceBusBuilder.cs index cffdef2..5f57c5c 100644 --- a/src/Ev.ServiceBus/ServiceBusBuilder.cs +++ b/src/Ev.ServiceBus/ServiceBusBuilder.cs @@ -1,5 +1,6 @@ using Ev.ServiceBus.Abstractions; using Ev.ServiceBus.Dispatch; +using Ev.ServiceBus.Dispatch.Outbox; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.DependencyInjection.Extensions; @@ -49,4 +50,14 @@ public ServiceBusBuilder WithPayloadSerializer() Services.Replace(new ServiceDescriptor(typeof(IMessagePayloadSerializer), typeof(TMessagePayloadSerializer), ServiceLifetime.Singleton)); return this; } + + public ServiceBusBuilder WithOutboxIntegration() + where TOutboxRepository : class, IOutboxService + { + Services.Decorate(); + Services.Decorate(); + Services.Decorate(); + Services.AddScoped(); + return this; + } } \ No newline at end of file diff --git a/src/Ev.ServiceBus/ServiceCollectionExtensions.cs b/src/Ev.ServiceBus/ServiceCollectionExtensions.cs index 1c30e39..f6aed2c 100644 --- a/src/Ev.ServiceBus/ServiceCollectionExtensions.cs +++ b/src/Ev.ServiceBus/ServiceCollectionExtensions.cs @@ -67,6 +67,8 @@ private static void RegisterMessageDispatchServices(IServiceCollection services) services.TryAddScoped(provider => provider.GetRequiredService()); services.TryAddScoped(provider => provider.GetRequiredService()); services.TryAddScoped(); + services.TryAddScoped(); + services.TryAddScoped(); } private static void RegisterResourceManagementServices(IServiceCollection services) diff --git a/tests/Ev.ServiceBus.UnitTests/Core/DeactivatedSenderTest.cs b/tests/Ev.ServiceBus.UnitTests/Core/DeactivatedSenderTest.cs index 4376447..1da5a83 100644 --- a/tests/Ev.ServiceBus.UnitTests/Core/DeactivatedSenderTest.cs +++ b/tests/Ev.ServiceBus.UnitTests/Core/DeactivatedSenderTest.cs @@ -34,7 +34,7 @@ private async Task ComposeServiceBusAndGetSender() var provider = await composer.Compose(); provider.GetSenderMock("testQueue").Should().BeNull(); - return provider.GetRequiredService().GetMessageSender(ClientType.Queue, "testQueue"); + return provider.GetRequiredService().GetMessageSender("testQueue"); } [Fact] diff --git a/tests/Ev.ServiceBus.UnitTests/DispatchTest.cs b/tests/Ev.ServiceBus.UnitTests/DispatchTest.cs index ef65051..47628fc 100644 --- a/tests/Ev.ServiceBus.UnitTests/DispatchTest.cs +++ b/tests/Ev.ServiceBus.UnitTests/DispatchTest.cs @@ -437,7 +437,7 @@ public async Task SendDispatch() // Verify var factory = provider.GetRequiredService(); var mock = factory.GetSenderMock("myQueue"); - mock.Mock.Verify(o => o.SendMessageAsync(It.IsAny(), It.IsAny()), Times.Exactly(1)); + mock.Mock.Verify(o => o.SendMessagesAsync(It.IsAny(), It.IsAny()), Times.Exactly(1)); // Dispose await provider.SimulateStopHost(CancellationToken.None);