Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -359,6 +359,7 @@ The `InboxAndOutbox` is the main section for setting of the Outbox and Inbox fun

```
"InboxAndOutbox": {
"SecondsToDelayBeforeCreateEventStoreTables": 0,
"Inbox": {
//Your inbox settings
},
Expand All @@ -371,7 +372,6 @@ The `InboxAndOutbox` is the main section for setting of the Outbox and Inbox fun
"TryAfterMinutes": 20,
"TryAfterMinutesIfEventNotFound": 60,
"SecondsToDelayProcessEvents": 2,
"SecondsToDelayBeforeCreateEventStoreTables": 0,
"DaysToCleanUpEvents": 30,
"HoursToDelayCleanUpEvents": 2,
"ConnectionString": "Connection string of the SQL database"
Expand Down
59 changes: 30 additions & 29 deletions src/BackgroundServices/BaseCleanUpProcessedEventsJob.cs
Original file line number Diff line number Diff line change
@@ -1,61 +1,62 @@
using EventStorage.Configurations;
using EventStorage.Models;
using EventStorage.Repositories;
using EventStorage.Services;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;

namespace EventStorage.BackgroundServices;

internal abstract class BaseCleanUpProcessedEventsJob<TEventRepository, TEventBox> : BackgroundService
internal abstract class BaseCleanUpProcessedEventsJob<TEventRepository, TEventBox>(
IServiceScopeFactory scopeFactory,
InboxOrOutboxStructure settings,
ILogger logger)
: BackgroundService
where TEventBox : class, IBaseMessageBox
where TEventRepository : IBaseEventRepository<TEventBox>
{
private readonly IServiceProvider _services;
private readonly ILogger _logger;
private readonly InboxOrOutboxStructure _settings;

protected BaseCleanUpProcessedEventsJob(IServiceProvider services,
InboxOrOutboxStructure settings, ILogger logger)
{
_services = services;
_logger = logger;
_settings = settings;
}

public override Task StartAsync(CancellationToken cancellationToken)
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
using var scope = _services.CreateScope();
var repository = scope.ServiceProvider.GetRequiredService<TEventRepository>();
repository.CreateTableIfNotExists();
if (settings.DaysToCleanUpEvents <= 0) return;

return base.StartAsync(cancellationToken);
}
await CreateEventStoreTablesIfNotExistsAsync(stoppingToken);

protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
if (_settings.DaysToCleanUpEvents <= 0) return;

var timeToDelay = TimeSpan.FromHours(_settings.HoursToDelayCleanUpEvents);
var timeToDelay = TimeSpan.FromHours(settings.HoursToDelayCleanUpEvents);
while (!stoppingToken.IsCancellationRequested)
{
try
{
using var scope = _services.CreateScope();
using var scope = scopeFactory.CreateScope();
var repository = scope.ServiceProvider.GetRequiredService<TEventRepository>();
var processedAt = DateTime.Now.AddDays(-_settings.DaysToCleanUpEvents);
var processedAt = DateTime.Now.AddDays(-settings.DaysToCleanUpEvents);
await repository.DeleteProcessedEventsAsync(processedAt);
}
catch (Exception e)
{
_logger.LogCritical(e,
logger.LogCritical(e,
"Something is wrong while cleaning up the processed events from the {TableName} table. Happened at: {time}",
_settings.TableName, DateTime.Now);
settings.TableName, DateTime.Now);
}
finally
{
await Task.Delay(timeToDelay, stoppingToken);
if (!stoppingToken.IsCancellationRequested)
await Task.Delay(timeToDelay, stoppingToken);
}
}
}

#region CreateEventStoreTablesIfNotExists

/// <summary>
/// Creates event store tables if they do not already exist. It waits for a configured delay before attempting to create the tables.
/// </summary>
private async Task CreateEventStoreTablesIfNotExistsAsync(CancellationToken cancellationToken)
{
using var scope = scopeFactory.CreateScope();
var eventStoreTablesCreator = scope.ServiceProvider.GetRequiredService<IEventStoreTablesCreator>();
await eventStoreTablesCreator.CreateTablesIfNotExistsAsync(cancellationToken);
}

#endregion
}
29 changes: 5 additions & 24 deletions src/BackgroundServices/BaseEventsProcessorJob.cs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,8 @@ protected override async Task ExecuteAsync(CancellationToken stoppingToken)
}
finally
{
await Task.Delay(_timeToDelay, stoppingToken);
if (!stoppingToken.IsCancellationRequested)
await Task.Delay(_timeToDelay, stoppingToken);
}
}
}
Expand All @@ -53,34 +54,14 @@ protected override async Task ExecuteAsync(CancellationToken stoppingToken)

#region CreateEventStoreTablesIfNotExists

/// <summary>
/// Semaphore to limit the number of concurrent table creation to 1.
/// This is to prevent multiple instances of the application from trying to run migrations at the same time.
/// </summary>
private static readonly SemaphoreSlim LimitToExecuteTableCreation = new(1, 1);

/// <summary>
/// Creates event store tables if they do not already exist. It waits for a configured delay before attempting to create the tables.
/// </summary>
private async Task CreateEventStoreTablesIfNotExistsAsync(CancellationToken cancellationToken)
{
Console.WriteLine("Starting IEventStoreTablesCreator. Seconds to delay: {0}" , functionalitySettings.SecondsToDelayBeforeCreateEventStoreTables);
var timeToDelay = TimeSpan.FromSeconds(functionalitySettings.SecondsToDelayBeforeCreateEventStoreTables);
await Task.Delay(timeToDelay, cancellationToken);
await LimitToExecuteTableCreation.WaitAsync(cancellationToken);

try
{
Console.WriteLine("Started IEventStoreTablesCreator...");
using var scope = scopeFactory.CreateScope();
var eventStoreTablesCreator = scope.ServiceProvider.GetRequiredService<IEventStoreTablesCreator>();
eventStoreTablesCreator.CreateTablesIfNotExists();
Console.WriteLine("Finished IEventStoreTablesCreator...");
}
finally
{
LimitToExecuteTableCreation.Release();
}
using var scope = scopeFactory.CreateScope();
var eventStoreTablesCreator = scope.ServiceProvider.GetRequiredService<IEventStoreTablesCreator>();
await eventStoreTablesCreator.CreateTablesIfNotExistsAsync(cancellationToken);
}

#endregion
Expand Down
6 changes: 6 additions & 0 deletions src/Configurations/InboxAndOutboxSettings.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,12 @@ namespace EventStorage.Configurations;

public class InboxAndOutboxSettings
{
/// <summary>
/// Seconds to delay before creating event store tables. Default value is "0".
/// Sometime, we may need to wait for other systems to create database itself before start processing events.
/// </summary>
public int SecondsToDelayBeforeCreateEventStoreTables { get; init; }

/// <summary>
/// For getting settings of an Inbox.
/// </summary>
Expand Down
6 changes: 0 additions & 6 deletions src/Configurations/InboxOrOutboxStructure.cs
Original file line number Diff line number Diff line change
Expand Up @@ -42,12 +42,6 @@ public record InboxOrOutboxStructure
/// </summary>
public int SecondsToDelayProcessEvents { get; init; } = 1;

/// <summary>
/// Seconds to delay before creating event store tables. Default value is "0".
/// Sometime, we may need to wait for other systems to create database itself before start processing events.
/// </summary>
public int SecondsToDelayBeforeCreateEventStoreTables { get; init; } = 0;

/// <summary>
/// Days to cleaning up the processed events. Default value is "0". It will work when value is higher than or equal 1.
/// </summary>
Expand Down
15 changes: 7 additions & 8 deletions src/Inbox/BackgroundServices/CleanUpProcessedInboxEventsJob.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,14 @@
using EventStorage.Inbox.Models;
using EventStorage.Inbox.Repositories;
using EventStorage.Outbox.BackgroundServices;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;

namespace EventStorage.Inbox.BackgroundServices;

internal class CleanUpProcessedInboxEventsJob : BaseCleanUpProcessedEventsJob<IInboxRepository, InboxMessage>
{
public CleanUpProcessedInboxEventsJob(IServiceProvider services,
InboxAndOutboxSettings settings, ILogger<OutboxEventsProcessorJob> logger) : base(services, settings.Inbox,
logger)
{
}
}
internal class CleanUpProcessedInboxEventsJob(
IServiceScopeFactory serviceScopeFactory,
InboxAndOutboxSettings settings,
ILogger<OutboxEventsProcessorJob> logger)
: BaseCleanUpProcessedEventsJob<IInboxRepository, InboxMessage>(serviceScopeFactory, settings.Inbox,
logger);
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,14 @@
using EventStorage.Configurations;
using EventStorage.Outbox.Models;
using EventStorage.Outbox.Repositories;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;

namespace EventStorage.Outbox.BackgroundServices;

internal class CleanUpProcessedOutboxEventsJob : BaseCleanUpProcessedEventsJob<IOutboxRepository, OutboxMessage>
{
public CleanUpProcessedOutboxEventsJob(IServiceProvider services,
InboxAndOutboxSettings settings, ILogger<OutboxEventsProcessorJob> logger) : base(services, settings.Outbox,
logger)
{
}
}
internal class CleanUpProcessedOutboxEventsJob(
IServiceScopeFactory scopeFactory,
InboxAndOutboxSettings settings,
ILogger<OutboxEventsProcessorJob> logger)
: BaseCleanUpProcessedEventsJob<IOutboxRepository, OutboxMessage>(scopeFactory, settings.Outbox,
logger);
28 changes: 24 additions & 4 deletions src/Services/EventStoreTablesCreator.cs
Original file line number Diff line number Diff line change
@@ -1,15 +1,35 @@
using EventStorage.Inbox.Repositories;
using EventStorage.Configurations;
using EventStorage.Inbox.Repositories;
using EventStorage.Outbox.Repositories;

namespace EventStorage.Services;

internal class EventStoreTablesCreator(
InboxAndOutboxSettings settings,
IInboxRepository inboxRepository = null,
IOutboxRepository outboxRepository = null) : IEventStoreTablesCreator
{
public void CreateTablesIfNotExists()
/// <summary>
/// Semaphore to limit the number of concurrent table creation to 1.
/// This is to prevent multiple instances of the application from trying to run migrations at the same time.
/// </summary>
private static readonly SemaphoreSlim LimitToExecuteTableCreation = new(1, 1);

public async Task CreateTablesIfNotExistsAsync(CancellationToken cancellationToken)
{
inboxRepository?.CreateTableIfNotExists();
outboxRepository?.CreateTableIfNotExists();
var timeToDelay = TimeSpan.FromSeconds(settings.SecondsToDelayBeforeCreateEventStoreTables);
await Task.Delay(timeToDelay, cancellationToken);

await LimitToExecuteTableCreation.WaitAsync(cancellationToken);

try
{
inboxRepository?.CreateTableIfNotExists();
outboxRepository?.CreateTableIfNotExists();
}
finally
{
LimitToExecuteTableCreation.Release();
}
}
}
2 changes: 1 addition & 1 deletion src/Services/IEventStoreTablesCreator.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,5 @@ public interface IEventStoreTablesCreator
/// <summary>
/// Creates Inbox and Outbox tables if they do not already exist.
/// </summary>
void CreateTablesIfNotExists();
Task CreateTablesIfNotExistsAsync(CancellationToken cancellationToken);
}
14 changes: 7 additions & 7 deletions tests/Services/CleanUpProcessedEventsJob.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,15 @@
using EventStorage.Configurations;
using EventStorage.Models;
using EventStorage.Repositories;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;

namespace EventStorage.Tests.Services;

internal class CleanUpProcessedEventsJob<TEventRepository, TEventBox> : BaseCleanUpProcessedEventsJob<TEventRepository, TEventBox>
internal class CleanUpProcessedEventsJob<TEventRepository, TEventBox>(
IServiceScopeFactory scopeFactory,
InboxOrOutboxStructure settings,
ILogger logger)
: BaseCleanUpProcessedEventsJob<TEventRepository, TEventBox>(scopeFactory, settings, logger)
where TEventBox : class, IBaseMessageBox
where TEventRepository : IBaseEventRepository<TEventBox>
{
public CleanUpProcessedEventsJob(IServiceProvider services, InboxOrOutboxStructure settings, ILogger logger) : base(services, settings, logger)
{
}
}
where TEventRepository : IBaseEventRepository<TEventBox>;
Loading