From eedd652fa737cfcde2f0ee7ae21f1b4c06f3ab85 Mon Sep 17 00:00:00 2001 From: Mirolim Majidov Date: Mon, 2 Mar 2026 11:54:48 +0500 Subject: [PATCH 1/3] Migrate payload column type to JSONB in BaseEventRepository and OutboxRepository; update .gitignore to include Claude files --- .gitignore | 5 +++- src/Outbox/Repositories/OutboxRepository.cs | 6 ++--- src/Repositories/BaseEventRepository.cs | 29 +++++++++++++++++---- 3 files changed, 31 insertions(+), 9 deletions(-) diff --git a/.gitignore b/.gitignore index 1f8c511..3eb8faa 100644 --- a/.gitignore +++ b/.gitignore @@ -188,4 +188,7 @@ FakesAssemblies/ # Ignoring application files appsettings.Development.* -SwaggerDocs.* \ No newline at end of file +SwaggerDocs.* + +# Claude files +*.claude \ No newline at end of file diff --git a/src/Outbox/Repositories/OutboxRepository.cs b/src/Outbox/Repositories/OutboxRepository.cs index e514e41..2d9a6e9 100644 --- a/src/Outbox/Repositories/OutboxRepository.cs +++ b/src/Outbox/Repositories/OutboxRepository.cs @@ -22,7 +22,7 @@ internal class OutboxRepository(ILogger logger, InboxAndOutbox provider VARCHAR(50) NOT NULL, event_name VARCHAR(100) NOT NULL, event_path VARCHAR(255), - payload TEXT, + payload JSONB, headers TEXT, additional_data TEXT, created_at TIMESTAMP(0) NOT NULL, @@ -39,7 +39,7 @@ processed_at TIMESTAMP(0) DEFAULT NULL id, provider, event_name, event_path, payload, headers, additional_data, created_at, try_count, try_after_at ) VALUES ( - @Id, @Provider, @EventName, @EventPath, @Payload, @Headers, + @Id, @Provider, @EventName, @EventPath, @Payload::jsonb, @Headers, @AdditionalData, @CreatedAt, @TryCount, @TryAfterAt )"; @@ -49,7 +49,7 @@ processed_at TIMESTAMP(0) DEFAULT NULL protected override string SqlQueryToGetUnprocessedEvents => $@" SELECT id as ""{nameof(OutboxMessage.Id)}"", provider as ""{nameof(OutboxMessage.Provider)}"", event_name as ""{nameof(OutboxMessage.EventName)}"", event_path as ""{nameof(OutboxMessage.EventPath)}"", - payload as ""{nameof(OutboxMessage.Payload)}"", headers as ""{nameof(OutboxMessage.Headers)}"", + payload::text as ""{nameof(OutboxMessage.Payload)}"", headers as ""{nameof(OutboxMessage.Headers)}"", additional_data as ""{nameof(OutboxMessage.AdditionalData)}"", created_at as ""{nameof(OutboxMessage.CreatedAt)}"", try_count as ""{nameof(OutboxMessage.TryCount)}"", try_after_at as ""{nameof(OutboxMessage.TryAfterAt)}"", processed_at as ""{nameof(OutboxMessage.ProcessedAt)}"" diff --git a/src/Repositories/BaseEventRepository.cs b/src/Repositories/BaseEventRepository.cs index dbe97a5..2763735 100644 --- a/src/Repositories/BaseEventRepository.cs +++ b/src/Repositories/BaseEventRepository.cs @@ -43,7 +43,7 @@ internal abstract class BaseEventRepository(ILogger logger, InboxO provider VARCHAR(50) NOT NULL, event_name VARCHAR(100) NOT NULL, event_path VARCHAR(255), - payload TEXT, + payload JSONB, headers TEXT, additional_data TEXT, naming_policy_type VARCHAR(15), @@ -53,6 +53,21 @@ try_after_at TIMESTAMP(0) NOT NULL, processed_at TIMESTAMP(0) DEFAULT NULL );"; + private string MigratePayloadColumnToJsonbScript => $@" + DO $$ + BEGIN + IF EXISTS ( + SELECT 1 FROM information_schema.columns + WHERE table_schema = 'public' + AND table_name = '{TableName}' + AND column_name = 'payload' + AND data_type = 'text' + ) THEN + ALTER TABLE {TableName} ALTER COLUMN payload TYPE JSONB USING payload::jsonb; + END IF; + END + $$;"; + public void CreateTableIfNotExists() { try @@ -60,13 +75,17 @@ public void CreateTableIfNotExists() using var dbConnection = new NpgsqlConnection(_connectionString); dbConnection.Open(); + var tableAlreadyExists = dbConnection.ExecuteScalar( + $"SELECT EXISTS (SELECT 1 FROM pg_tables WHERE schemaname = 'public' AND tablename = '{TableName}')"); + + var scriptToExecute = tableAlreadyExists ? MigratePayloadColumnToJsonbScript : CreateTableSqlScript; + dbConnection.Execute(scriptToExecute); + var createIndexScripts = $@"CREATE INDEX IF NOT EXISTS idx_for_get_unprocessed_events_of_{TableName} ON public.{TableName} (processed_at, try_after_at); CREATE INDEX IF NOT EXISTS idx_for_delete_processed_events_of_{TableName} ON public.{TableName} (processed_at);"; - - dbConnection.Execute(CreateTableSqlScript); dbConnection.Execute(createIndexScripts); } catch (Exception e) @@ -87,7 +106,7 @@ CREATE INDEX IF NOT EXISTS idx_for_delete_processed_events_of_{TableName} id, provider, event_name, event_path, payload, headers, additional_data, naming_policy_type, created_at, try_count, try_after_at ) VALUES ( - @Id, @Provider, @EventName, @EventPath, @Payload, @Headers, + @Id, @Provider, @EventName, @EventPath, @Payload::jsonb, @Headers, @AdditionalData, @NamingPolicyType, @CreatedAt, @TryCount, @TryAfterAt )"; @@ -188,7 +207,7 @@ public bool BulkInsertEvents(TBaseMessage[] events) protected virtual string SqlQueryToGetUnprocessedEvents => $@" SELECT id as ""{nameof(IBaseMessageBox.Id)}"", provider as ""{nameof(IBaseMessageBox.Provider)}"", event_name as ""{nameof(IBaseMessageBox.EventName)}"", event_path as ""{nameof(IBaseMessageBox.EventPath)}"", - payload as ""{nameof(IBaseMessageBox.Payload)}"", headers as ""{nameof(IBaseMessageBox.Headers)}"", + payload::text as ""{nameof(IBaseMessageBox.Payload)}"", headers as ""{nameof(IBaseMessageBox.Headers)}"", naming_policy_type as ""{nameof(IBaseMessageBox.NamingPolicyType)}"", additional_data as ""{nameof(IBaseMessageBox.AdditionalData)}"", created_at as ""{nameof(IBaseMessageBox.CreatedAt)}"", try_count as ""{nameof(IBaseMessageBox.TryCount)}"", try_after_at as ""{nameof(IBaseMessageBox.TryAfterAt)}"", From 1e0bf880988b1fc041a9c045f2b9ca297d183c94 Mon Sep 17 00:00:00 2001 From: Mirolim Majidov Date: Mon, 2 Mar 2026 12:50:24 +0500 Subject: [PATCH 2/3] Refactor BaseEventRepository to improve table creation logic and add migration script for payload column to JSONB type --- src/Repositories/BaseEventRepository.cs | 32 ++++++++++++++----------- 1 file changed, 18 insertions(+), 14 deletions(-) diff --git a/src/Repositories/BaseEventRepository.cs b/src/Repositories/BaseEventRepository.cs index 2763735..d242c9c 100644 --- a/src/Repositories/BaseEventRepository.cs +++ b/src/Repositories/BaseEventRepository.cs @@ -21,7 +21,7 @@ internal abstract class BaseEventRepository(ILogger logger, InboxO where TBaseMessage : class, IBaseMessageBox { private readonly string _connectionString = settings.ConnectionString; - + /// /// The name of the table for connecting repository to the correct table in the database. /// @@ -32,7 +32,7 @@ internal abstract class BaseEventRepository(ILogger logger, InboxO /// protected abstract string TraceMessageTag { get; } - #region CreateTableIfNotExists + #region Create tables or indexes if not exists /// /// The SQL script for creating the table for storing events if it does not exist. @@ -53,6 +53,10 @@ try_after_at TIMESTAMP(0) NOT NULL, processed_at TIMESTAMP(0) DEFAULT NULL );"; + /// + /// The SQL script for migrating the payload column from text to jsonb type if the column exists and has text type. + /// This is for supporting the old versions of the library which used text type for payload column. + /// private string MigratePayloadColumnToJsonbScript => $@" DO $$ BEGIN @@ -68,6 +72,15 @@ SELECT 1 FROM information_schema.columns END $$;"; + /// + /// The SQL script for creating indexes for the table. It creates an index for getting unprocessed events and another index for deleting processed events. + /// + private string CreateIndexesScript => $@"CREATE INDEX IF NOT EXISTS idx_for_get_unprocessed_events_of_{TableName} + ON public.{TableName} (processed_at, try_after_at); + + CREATE INDEX IF NOT EXISTS idx_for_delete_processed_events_of_{TableName} + ON public.{TableName} (processed_at);"; + public void CreateTableIfNotExists() { try @@ -75,18 +88,9 @@ public void CreateTableIfNotExists() using var dbConnection = new NpgsqlConnection(_connectionString); dbConnection.Open(); - var tableAlreadyExists = dbConnection.ExecuteScalar( - $"SELECT EXISTS (SELECT 1 FROM pg_tables WHERE schemaname = 'public' AND tablename = '{TableName}')"); - - var scriptToExecute = tableAlreadyExists ? MigratePayloadColumnToJsonbScript : CreateTableSqlScript; - dbConnection.Execute(scriptToExecute); - - var createIndexScripts = $@"CREATE INDEX IF NOT EXISTS idx_for_get_unprocessed_events_of_{TableName} - ON public.{TableName} (processed_at, try_after_at); - - CREATE INDEX IF NOT EXISTS idx_for_delete_processed_events_of_{TableName} - ON public.{TableName} (processed_at);"; - dbConnection.Execute(createIndexScripts); + dbConnection.Execute(CreateTableSqlScript); + dbConnection.Execute(MigratePayloadColumnToJsonbScript); + dbConnection.Execute(CreateIndexesScript); } catch (Exception e) { From b9a45f742b47bf00b94c6bfefbaa6625a2ac344a Mon Sep 17 00:00:00 2001 From: Mirolim Majidov Date: Mon, 2 Mar 2026 13:17:55 +0500 Subject: [PATCH 3/3] Update test payloads to use empty JSON objects for consistency --- tests/UnitTests/BaseEventRepositoryTests.cs | 32 +++++++++---------- tests/UnitTests/Inbox/InboxRepositoryTests.cs | 2 +- 2 files changed, 17 insertions(+), 17 deletions(-) diff --git a/tests/UnitTests/BaseEventRepositoryTests.cs b/tests/UnitTests/BaseEventRepositoryTests.cs index 8736b59..d02c0e2 100644 --- a/tests/UnitTests/BaseEventRepositoryTests.cs +++ b/tests/UnitTests/BaseEventRepositoryTests.cs @@ -41,7 +41,7 @@ public void InsertEvent_OneItem_EventShouldBeInserted() Provider = "TestProvider", EventName = "TestEvent", EventPath = "/test/path", - Payload = "TestPayload", + Payload = "{}", Headers = "TestHeaders", AdditionalData = "TestAdditionalData", TryCount = 0, @@ -71,7 +71,7 @@ public async Task InsertEventAsync_OneItem_EventShouldBeInserted() Provider = "TestProvider", EventName = "TestEvent", EventPath = "/test/path", - Payload = "TestPayload", + Payload = "{}", Headers = "TestHeaders", AdditionalData = "TestAdditionalData", TryCount = 0, @@ -100,7 +100,7 @@ public void BulkInsertEvents_AddedTwoItems_BothEventsShouldBeInserted() Provider = "TestProvider", EventName = "TestEvent1", EventPath = "/test/path", - Payload = "TestPayload", + Payload = "{}", Headers = "TestHeaders", AdditionalData = "TestAdditionalData", TryCount = 0, @@ -112,7 +112,7 @@ public void BulkInsertEvents_AddedTwoItems_BothEventsShouldBeInserted() Provider = "TestProvider", EventName = "TestEvent2", EventPath = "/test/path", - Payload = "TestPayload", + Payload = "{}", Headers = "TestHeaders", AdditionalData = "TestAdditionalData", TryCount = 0, @@ -144,7 +144,7 @@ public async Task BulkInsertEventsAsync_AddedTwoItems_BothEventsShouldBeInserted Provider = "TestProvider", EventName = "TestEvent1", EventPath = "/test/path", - Payload = "TestPayload", + Payload = "{}", Headers = "TestHeaders", AdditionalData = "TestAdditionalData", TryCount = 0, @@ -156,7 +156,7 @@ public async Task BulkInsertEventsAsync_AddedTwoItems_BothEventsShouldBeInserted Provider = "TestProvider", EventName = "TestEvent2", EventPath = "/test/path", - Payload = "TestPayload", + Payload = "{}", Headers = "TestHeaders", AdditionalData = "TestAdditionalData", NamingPolicyType = NamingPolicyTypeNames.PascalCase, @@ -189,7 +189,7 @@ public async Task GetUnprocessedEventsAsync_TwoItems_ShouldReturnPendingEvents() Provider = "TestProvider1", EventName = "TestEvent1" + typeof(TEvent).FullName, EventPath = "/test/path1", - Payload = "TestPayload1", + Payload = "{}", Headers = "TestHeaders1", AdditionalData = "TestAdditionalData1", TryCount = 0, @@ -202,7 +202,7 @@ public async Task GetUnprocessedEventsAsync_TwoItems_ShouldReturnPendingEvents() Provider = "TestProvider2", EventName = "TestEvent2" + typeof(TEvent).FullName, EventPath = "/test/path2", - Payload = "TestPayload2", + Payload = "{}", Headers = "TestHeaders2", AdditionalData = "TestAdditionalData2", TryCount = 0, @@ -215,7 +215,7 @@ public async Task GetUnprocessedEventsAsync_TwoItems_ShouldReturnPendingEvents() Provider = "TestProvider3", EventName = "TestEvent3" + typeof(TEvent).FullName, EventPath = "/test/path2", - Payload = "TestPayload2", + Payload = "{}", Headers = "TestHeaders2", AdditionalData = "TestAdditionalData2", TryCount = 0, @@ -248,7 +248,7 @@ public async Task UpdateEventAsync_OneItem_ShouldUpdateEvent() Provider = "TestProvider", EventName = "TestEvent", EventPath = "/test/path", - Payload = "TestPayload", + Payload = "{}", Headers = "TestHeaders", AdditionalData = "TestAdditionalData", TryCount = 0 @@ -285,7 +285,7 @@ public async Task UpdateEventsAsync_TwoItems_ShouldUpdateEvents() Provider = "TestProvider1", EventName = "TestEvent1", EventPath = "/test/path1", - Payload = "TestPayload1", + Payload = "{}", Headers = "TestHeaders1", AdditionalData = "TestAdditionalData1", TryCount = 0 @@ -297,7 +297,7 @@ public async Task UpdateEventsAsync_TwoItems_ShouldUpdateEvents() Provider = "TestProvider2", EventName = "TestEvent2", EventPath = "/test/path2", - Payload = "TestPayload2", + Payload = "{}", Headers = "TestHeaders2", AdditionalData = "TestAdditionalData2", TryCount = 0 @@ -343,7 +343,7 @@ public async Task IsEventProcessedAsync_EventExistsAndProcessed_ShouldReturnTrue Provider = "TestProvider", EventName = "ProcessedEvent", EventPath = "/test/path", - Payload = "TestPayload", + Payload = "{}", Headers = "TestHeaders", AdditionalData = "TestAdditionalData", TryCount = 1, @@ -368,7 +368,7 @@ public async Task IsEventProcessedAsync_EventExistsButNotProcessed_ShouldReturnF Provider = "TestProvider", EventName = "UnprocessedEvent", EventPath = "/test/path", - Payload = "TestPayload", + Payload = "{}", Headers = "TestHeaders", AdditionalData = "TestAdditionalData", TryCount = 0, @@ -406,7 +406,7 @@ public async Task DeleteProcessedEventsAsync_OneItems_ShouldDeleteProcessedEvent Provider = "TestProvider1", EventName = "TestEvent1", EventPath = "/test/path1", - Payload = "TestPayload1", + Payload = "{}", Headers = "TestHeaders1", AdditionalData = "TestAdditionalData1", TryCount = 0 @@ -418,7 +418,7 @@ public async Task DeleteProcessedEventsAsync_OneItems_ShouldDeleteProcessedEvent Provider = "TestProvider2", EventName = "TestEvent2", EventPath = "/test/path2", - Payload = "TestPayload2", + Payload = "{}", Headers = "TestHeaders2", AdditionalData = "TestAdditionalData2", TryCount = 0 diff --git a/tests/UnitTests/Inbox/InboxRepositoryTests.cs b/tests/UnitTests/Inbox/InboxRepositoryTests.cs index e41e8d8..392cb00 100644 --- a/tests/UnitTests/Inbox/InboxRepositoryTests.cs +++ b/tests/UnitTests/Inbox/InboxRepositoryTests.cs @@ -31,7 +31,7 @@ public void InsertEvent_PassingNamingPolicyType_StoresEventWithCorrectNamingPoli Provider = "TestProvider", EventName = "TestEvent1", EventPath = "/test/path", - Payload = "TestPayload", + Payload = "{}", Headers = "TestHeaders", AdditionalData = "TestAdditionalData", TryCount = 0,