Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -188,4 +188,7 @@ FakesAssemblies/

# Ignoring application files
appsettings.Development.*
SwaggerDocs.*
SwaggerDocs.*

# Claude files
*.claude
6 changes: 3 additions & 3 deletions src/Outbox/Repositories/OutboxRepository.cs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ internal class OutboxRepository(ILogger<OutboxRepository> logger, InboxAndOutbox
provider VARCHAR(50) NOT NULL,
event_name VARCHAR(100) NOT NULL,
event_path VARCHAR(255),
payload TEXT,
payload JSONB,
headers TEXT,
additional_data TEXT,
created_at TIMESTAMP(0) NOT NULL,
Expand All @@ -39,7 +39,7 @@ processed_at TIMESTAMP(0) DEFAULT NULL
id, provider, event_name, event_path, payload, headers,
additional_data, created_at, try_count, try_after_at
) VALUES (
@Id, @Provider, @EventName, @EventPath, @Payload, @Headers,
@Id, @Provider, @EventName, @EventPath, @Payload::jsonb, @Headers,
@AdditionalData, @CreatedAt, @TryCount, @TryAfterAt
)";

Expand All @@ -49,7 +49,7 @@ processed_at TIMESTAMP(0) DEFAULT NULL
protected override string SqlQueryToGetUnprocessedEvents => $@"
SELECT id as ""{nameof(OutboxMessage.Id)}"", provider as ""{nameof(OutboxMessage.Provider)}"",
event_name as ""{nameof(OutboxMessage.EventName)}"", event_path as ""{nameof(OutboxMessage.EventPath)}"",
payload as ""{nameof(OutboxMessage.Payload)}"", headers as ""{nameof(OutboxMessage.Headers)}"",
payload::text as ""{nameof(OutboxMessage.Payload)}"", headers as ""{nameof(OutboxMessage.Headers)}"",
additional_data as ""{nameof(OutboxMessage.AdditionalData)}"", created_at as ""{nameof(OutboxMessage.CreatedAt)}"",
try_count as ""{nameof(OutboxMessage.TryCount)}"", try_after_at as ""{nameof(OutboxMessage.TryAfterAt)}"",
processed_at as ""{nameof(OutboxMessage.ProcessedAt)}""
Expand Down
47 changes: 35 additions & 12 deletions src/Repositories/BaseEventRepository.cs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ internal abstract class BaseEventRepository<TBaseMessage>(ILogger logger, InboxO
where TBaseMessage : class, IBaseMessageBox
{
private readonly string _connectionString = settings.ConnectionString;

/// <summary>
/// The name of the table for connecting repository to the correct table in the database.
/// </summary>
Expand All @@ -32,7 +32,7 @@ internal abstract class BaseEventRepository<TBaseMessage>(ILogger logger, InboxO
/// </summary>
protected abstract string TraceMessageTag { get; }

#region CreateTableIfNotExists
#region Create tables or indexes if not exists

/// <summary>
/// The SQL script for creating the table for storing events if it does not exist.
Expand All @@ -43,7 +43,7 @@ internal abstract class BaseEventRepository<TBaseMessage>(ILogger logger, InboxO
provider VARCHAR(50) NOT NULL,
event_name VARCHAR(100) NOT NULL,
event_path VARCHAR(255),
payload TEXT,
payload JSONB,
headers TEXT,
additional_data TEXT,
naming_policy_type VARCHAR(15),
Expand All @@ -53,21 +53,44 @@ try_after_at TIMESTAMP(0) NOT NULL,
processed_at TIMESTAMP(0) DEFAULT NULL
);";

/// <summary>
/// The SQL script for migrating the payload column from text to jsonb type if the column exists and has text type.
/// This is for supporting the old versions of the library which used text type for payload column.
/// </summary>
private string MigratePayloadColumnToJsonbScript => $@"
DO $$
BEGIN
IF EXISTS (
SELECT 1 FROM information_schema.columns
WHERE table_schema = 'public'
AND table_name = '{TableName}'
AND column_name = 'payload'
AND data_type = 'text'
) THEN
ALTER TABLE {TableName} ALTER COLUMN payload TYPE JSONB USING payload::jsonb;
END IF;
END
$$;";

/// <summary>
/// The SQL script for creating indexes for the table. It creates an index for getting unprocessed events and another index for deleting processed events.
/// </summary>
private string CreateIndexesScript => $@"CREATE INDEX IF NOT EXISTS idx_for_get_unprocessed_events_of_{TableName}
ON public.{TableName} (processed_at, try_after_at);

CREATE INDEX IF NOT EXISTS idx_for_delete_processed_events_of_{TableName}
ON public.{TableName} (processed_at);";

public void CreateTableIfNotExists()
{
try
{
using var dbConnection = new NpgsqlConnection(_connectionString);
dbConnection.Open();

var createIndexScripts = $@"CREATE INDEX IF NOT EXISTS idx_for_get_unprocessed_events_of_{TableName}
ON public.{TableName} (processed_at, try_after_at);

CREATE INDEX IF NOT EXISTS idx_for_delete_processed_events_of_{TableName}
ON public.{TableName} (processed_at);";

dbConnection.Execute(CreateTableSqlScript);
dbConnection.Execute(createIndexScripts);
dbConnection.Execute(MigratePayloadColumnToJsonbScript);
dbConnection.Execute(CreateIndexesScript);
}
catch (Exception e)
{
Expand All @@ -87,7 +110,7 @@ CREATE INDEX IF NOT EXISTS idx_for_delete_processed_events_of_{TableName}
id, provider, event_name, event_path, payload, headers,
additional_data, naming_policy_type, created_at, try_count, try_after_at
) VALUES (
@Id, @Provider, @EventName, @EventPath, @Payload, @Headers,
@Id, @Provider, @EventName, @EventPath, @Payload::jsonb, @Headers,
@AdditionalData, @NamingPolicyType, @CreatedAt, @TryCount, @TryAfterAt
)";

Expand Down Expand Up @@ -188,7 +211,7 @@ public bool BulkInsertEvents(TBaseMessage[] events)
protected virtual string SqlQueryToGetUnprocessedEvents => $@"
SELECT id as ""{nameof(IBaseMessageBox.Id)}"", provider as ""{nameof(IBaseMessageBox.Provider)}"",
event_name as ""{nameof(IBaseMessageBox.EventName)}"", event_path as ""{nameof(IBaseMessageBox.EventPath)}"",
payload as ""{nameof(IBaseMessageBox.Payload)}"", headers as ""{nameof(IBaseMessageBox.Headers)}"",
payload::text as ""{nameof(IBaseMessageBox.Payload)}"", headers as ""{nameof(IBaseMessageBox.Headers)}"",
naming_policy_type as ""{nameof(IBaseMessageBox.NamingPolicyType)}"",
additional_data as ""{nameof(IBaseMessageBox.AdditionalData)}"", created_at as ""{nameof(IBaseMessageBox.CreatedAt)}"",
try_count as ""{nameof(IBaseMessageBox.TryCount)}"", try_after_at as ""{nameof(IBaseMessageBox.TryAfterAt)}"",
Expand Down
32 changes: 16 additions & 16 deletions tests/UnitTests/BaseEventRepositoryTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ public void InsertEvent_OneItem_EventShouldBeInserted()
Provider = "TestProvider",
EventName = "TestEvent",
EventPath = "/test/path",
Payload = "TestPayload",
Payload = "{}",
Headers = "TestHeaders",
AdditionalData = "TestAdditionalData",
TryCount = 0,
Expand Down Expand Up @@ -71,7 +71,7 @@ public async Task InsertEventAsync_OneItem_EventShouldBeInserted()
Provider = "TestProvider",
EventName = "TestEvent",
EventPath = "/test/path",
Payload = "TestPayload",
Payload = "{}",
Headers = "TestHeaders",
AdditionalData = "TestAdditionalData",
TryCount = 0,
Expand Down Expand Up @@ -100,7 +100,7 @@ public void BulkInsertEvents_AddedTwoItems_BothEventsShouldBeInserted()
Provider = "TestProvider",
EventName = "TestEvent1",
EventPath = "/test/path",
Payload = "TestPayload",
Payload = "{}",
Headers = "TestHeaders",
AdditionalData = "TestAdditionalData",
TryCount = 0,
Expand All @@ -112,7 +112,7 @@ public void BulkInsertEvents_AddedTwoItems_BothEventsShouldBeInserted()
Provider = "TestProvider",
EventName = "TestEvent2",
EventPath = "/test/path",
Payload = "TestPayload",
Payload = "{}",
Headers = "TestHeaders",
AdditionalData = "TestAdditionalData",
TryCount = 0,
Expand Down Expand Up @@ -144,7 +144,7 @@ public async Task BulkInsertEventsAsync_AddedTwoItems_BothEventsShouldBeInserted
Provider = "TestProvider",
EventName = "TestEvent1",
EventPath = "/test/path",
Payload = "TestPayload",
Payload = "{}",
Headers = "TestHeaders",
AdditionalData = "TestAdditionalData",
TryCount = 0,
Expand All @@ -156,7 +156,7 @@ public async Task BulkInsertEventsAsync_AddedTwoItems_BothEventsShouldBeInserted
Provider = "TestProvider",
EventName = "TestEvent2",
EventPath = "/test/path",
Payload = "TestPayload",
Payload = "{}",
Headers = "TestHeaders",
AdditionalData = "TestAdditionalData",
NamingPolicyType = NamingPolicyTypeNames.PascalCase,
Expand Down Expand Up @@ -189,7 +189,7 @@ public async Task GetUnprocessedEventsAsync_TwoItems_ShouldReturnPendingEvents()
Provider = "TestProvider1",
EventName = "TestEvent1" + typeof(TEvent).FullName,
EventPath = "/test/path1",
Payload = "TestPayload1",
Payload = "{}",
Headers = "TestHeaders1",
AdditionalData = "TestAdditionalData1",
TryCount = 0,
Expand All @@ -202,7 +202,7 @@ public async Task GetUnprocessedEventsAsync_TwoItems_ShouldReturnPendingEvents()
Provider = "TestProvider2",
EventName = "TestEvent2" + typeof(TEvent).FullName,
EventPath = "/test/path2",
Payload = "TestPayload2",
Payload = "{}",
Headers = "TestHeaders2",
AdditionalData = "TestAdditionalData2",
TryCount = 0,
Expand All @@ -215,7 +215,7 @@ public async Task GetUnprocessedEventsAsync_TwoItems_ShouldReturnPendingEvents()
Provider = "TestProvider3",
EventName = "TestEvent3" + typeof(TEvent).FullName,
EventPath = "/test/path2",
Payload = "TestPayload2",
Payload = "{}",
Headers = "TestHeaders2",
AdditionalData = "TestAdditionalData2",
TryCount = 0,
Expand Down Expand Up @@ -248,7 +248,7 @@ public async Task UpdateEventAsync_OneItem_ShouldUpdateEvent()
Provider = "TestProvider",
EventName = "TestEvent",
EventPath = "/test/path",
Payload = "TestPayload",
Payload = "{}",
Headers = "TestHeaders",
AdditionalData = "TestAdditionalData",
TryCount = 0
Expand Down Expand Up @@ -285,7 +285,7 @@ public async Task UpdateEventsAsync_TwoItems_ShouldUpdateEvents()
Provider = "TestProvider1",
EventName = "TestEvent1",
EventPath = "/test/path1",
Payload = "TestPayload1",
Payload = "{}",
Headers = "TestHeaders1",
AdditionalData = "TestAdditionalData1",
TryCount = 0
Expand All @@ -297,7 +297,7 @@ public async Task UpdateEventsAsync_TwoItems_ShouldUpdateEvents()
Provider = "TestProvider2",
EventName = "TestEvent2",
EventPath = "/test/path2",
Payload = "TestPayload2",
Payload = "{}",
Headers = "TestHeaders2",
AdditionalData = "TestAdditionalData2",
TryCount = 0
Expand Down Expand Up @@ -343,7 +343,7 @@ public async Task IsEventProcessedAsync_EventExistsAndProcessed_ShouldReturnTrue
Provider = "TestProvider",
EventName = "ProcessedEvent",
EventPath = "/test/path",
Payload = "TestPayload",
Payload = "{}",
Headers = "TestHeaders",
AdditionalData = "TestAdditionalData",
TryCount = 1,
Expand All @@ -368,7 +368,7 @@ public async Task IsEventProcessedAsync_EventExistsButNotProcessed_ShouldReturnF
Provider = "TestProvider",
EventName = "UnprocessedEvent",
EventPath = "/test/path",
Payload = "TestPayload",
Payload = "{}",
Headers = "TestHeaders",
AdditionalData = "TestAdditionalData",
TryCount = 0,
Expand Down Expand Up @@ -406,7 +406,7 @@ public async Task DeleteProcessedEventsAsync_OneItems_ShouldDeleteProcessedEvent
Provider = "TestProvider1",
EventName = "TestEvent1",
EventPath = "/test/path1",
Payload = "TestPayload1",
Payload = "{}",
Headers = "TestHeaders1",
AdditionalData = "TestAdditionalData1",
TryCount = 0
Expand All @@ -418,7 +418,7 @@ public async Task DeleteProcessedEventsAsync_OneItems_ShouldDeleteProcessedEvent
Provider = "TestProvider2",
EventName = "TestEvent2",
EventPath = "/test/path2",
Payload = "TestPayload2",
Payload = "{}",
Headers = "TestHeaders2",
AdditionalData = "TestAdditionalData2",
TryCount = 0
Expand Down
2 changes: 1 addition & 1 deletion tests/UnitTests/Inbox/InboxRepositoryTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ public void InsertEvent_PassingNamingPolicyType_StoresEventWithCorrectNamingPoli
Provider = "TestProvider",
EventName = "TestEvent1",
EventPath = "/test/path",
Payload = "TestPayload",
Payload = "{}",
Headers = "TestHeaders",
AdditionalData = "TestAdditionalData",
TryCount = 0,
Expand Down