Simple library for reliable messaging with RabbitMQ (for publishing messages and requests between Microservices).
Build and Release deployment (NuGet) is automated with Azure Devops. Try it, it's free and very powerful.
Via NuGet you can install the NuGet packages Epos.Messaging and Epos.Messaging.RabbitMQ.
$ dotnet add package Epos.Messaging
$ dotnet add package Epos.Messaging.RabbitMQ| Pattern | Meaning | Types |
|---|---|---|
| Integration command | Persistent commands that need to be handled | IIntegrationCommandPublisher IIntegrationCommandSubscriber IntegrationCommand IIntegrationCommandHandler |
| Integration request | Transient request that can timeout and delivers a reply, if successful | IIntegrationRequestPublisher IIntegrationRequestSubscriber IntegrationRequest IntegrationReply IIntegrationRequestHandler |
| Integration event | Transient events that can be handled by multiple subscribers | Not implemented yet |
An Integration Command is durable, persistent and must be handeled by exactly one handler.
// Model and corresponding integration command
public class Note
{
public string Id { get; set; }
public string Text { get; set; }
public string Author { get; set; }
public DateTime Updated { get; set; }
}
public class NoteAddedIntegrationCommand : IntegrationCommand
{
public Note AddedNote { get; set; }
}
// Startup.cs
services.AddIntegrationCommandPublisherRabbitMQ();
services.AddSingleton(RabbitMQOptions.Default); // <- connects to localhost:5672
// NoteController.cs
public class NoteController
{
// ...
public NoteController(IIntegrationCommandPublisher publisher) {
this.publisher = publisher;
}
// ...
[HttpPost]
public async Task<ActionResult> Post(Note note) {
// ...
await this.publisher.PublishAsync(new NoteAddedIntegrationCommand { AddedNote = note });
// ...
}
}The command is published reliably to a persistent RabbitMQ queue.
As long as no command handler is registered, the command is waiting and persisted in a RabbitMQ Queue.
IMPORTANT Do not forget to acknowledge the command after successfully handling the command. Otherwise the command will be redelivered.
// Command handler class
public class NoteAddedCommandHandler : IIntegrationCommandHandler<NoteAddedIntegrationCommand>
{
public Task Handle(NoteAddedIntegrationCommand c, CancellationToken token, CommandHelper h) {
var theMessage = $"Added note '{c.AddedNote.Text}' by {e.AddedNote.Author}.";
Console.WriteLine(theMessage);
h.Ack(); // <- acknowledge command is handled succesfully
return Task.CompletedTask;
}
}
// Startup.cs
services.AddIntegrationCommandSubscriberRabbitMQ();
services.AddSingleton(RabbitMQOptions.Default);
services.AddIntegrationCommandHandler<NoteAddedCommandHandler>();
// App code (e.g. in Program.cs before IHost.Run)
ISubscription subscription = await theSubscriber.SubscribeAsync<NoteAddedIntegrationCommand>();
// ...
// The subscription can be gracefully canceledAn Integration Request is transient and must be handeled by exactly one handler. The handler can timeout,
so be prepared for that.
// Integration request
public class CalculationRequest : IntegrationRequest
{
public int Number { get; set; }
}
// Integration reply
public class CalculationReply : IntegrationReply
{
public int DoubledNumber { get; set; }
}
// Startup.cs
services.AddIntegrationRequestPublisherRabbitMQ();
services.AddSingleton(RabbitMQOptions.Default);
// CalculationController.cs
public class CalculationController
{
// ...
public CalculationController(IIntegrationRequestPublisher publisher) {
this.publisher = publisher;
}
// ...
[HttpPost]
public async Task<ActionResult> Post(CalculationRequest request) {
// ...
try {
CalculationReply reply =
await this.publisher.PublishAsync<CalculationRequest, CalculationReply>(request);
} catch (TimeoutException) {
// Handle timeout
}
// ...
}
}The request is published to a transient RabbitMQ queue.
As long as no request handler is registered, the request will timeout.
// Request handler class
public class CalculationRequestHandler : IIntegrationRequestHandler<CalculationRequest, CalculationReply>
{
public Task<CalculationReply> Handle(CalculationRequest request, CancellationToken token) =>
Task.FromResult(new CalculationReply { DoubledNumber = request.Number * 2 });
}
// Startup.cs
services.AddIntegrationRequestSubscriberRabbitMQ();
services.AddSingleton(RabbitMQOptions.Default);
services.AddIntegrationRequestHandler<CalculationRequestHandler>();
// App code (e.g. in Program.cs before IHost.Run)
ISubscription subscription = await theSubscriber.SubscribeAsync<CalculationRequest, CalculationReply>();
// ...
// The subscription can be canceled
subscription.Cancel();See unit tests in Epos.Messaging.RabbitMQ.Tests.
MIT License
Copyright (c) 2019 eposgmbh
Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.