diff --git a/.gitignore b/.gitignore index 1f8c511..3eb8faa 100644 --- a/.gitignore +++ b/.gitignore @@ -188,4 +188,7 @@ FakesAssemblies/ # Ignoring application files appsettings.Development.* -SwaggerDocs.* \ No newline at end of file +SwaggerDocs.* + +# Claude files +*.claude \ No newline at end of file diff --git a/src/Outbox/Repositories/OutboxRepository.cs b/src/Outbox/Repositories/OutboxRepository.cs index e514e41..2d9a6e9 100644 --- a/src/Outbox/Repositories/OutboxRepository.cs +++ b/src/Outbox/Repositories/OutboxRepository.cs @@ -22,7 +22,7 @@ internal class OutboxRepository(ILogger logger, InboxAndOutbox provider VARCHAR(50) NOT NULL, event_name VARCHAR(100) NOT NULL, event_path VARCHAR(255), - payload TEXT, + payload JSONB, headers TEXT, additional_data TEXT, created_at TIMESTAMP(0) NOT NULL, @@ -39,7 +39,7 @@ processed_at TIMESTAMP(0) DEFAULT NULL id, provider, event_name, event_path, payload, headers, additional_data, created_at, try_count, try_after_at ) VALUES ( - @Id, @Provider, @EventName, @EventPath, @Payload, @Headers, + @Id, @Provider, @EventName, @EventPath, @Payload::jsonb, @Headers, @AdditionalData, @CreatedAt, @TryCount, @TryAfterAt )"; @@ -49,7 +49,7 @@ processed_at TIMESTAMP(0) DEFAULT NULL protected override string SqlQueryToGetUnprocessedEvents => $@" SELECT id as ""{nameof(OutboxMessage.Id)}"", provider as ""{nameof(OutboxMessage.Provider)}"", event_name as ""{nameof(OutboxMessage.EventName)}"", event_path as ""{nameof(OutboxMessage.EventPath)}"", - payload as ""{nameof(OutboxMessage.Payload)}"", headers as ""{nameof(OutboxMessage.Headers)}"", + payload::text as ""{nameof(OutboxMessage.Payload)}"", headers as ""{nameof(OutboxMessage.Headers)}"", additional_data as ""{nameof(OutboxMessage.AdditionalData)}"", created_at as ""{nameof(OutboxMessage.CreatedAt)}"", try_count as ""{nameof(OutboxMessage.TryCount)}"", try_after_at as ""{nameof(OutboxMessage.TryAfterAt)}"", processed_at as ""{nameof(OutboxMessage.ProcessedAt)}"" diff --git a/src/Repositories/BaseEventRepository.cs b/src/Repositories/BaseEventRepository.cs index dbe97a5..d242c9c 100644 --- a/src/Repositories/BaseEventRepository.cs +++ b/src/Repositories/BaseEventRepository.cs @@ -21,7 +21,7 @@ internal abstract class BaseEventRepository(ILogger logger, InboxO where TBaseMessage : class, IBaseMessageBox { private readonly string _connectionString = settings.ConnectionString; - + /// /// The name of the table for connecting repository to the correct table in the database. /// @@ -32,7 +32,7 @@ internal abstract class BaseEventRepository(ILogger logger, InboxO /// protected abstract string TraceMessageTag { get; } - #region CreateTableIfNotExists + #region Create tables or indexes if not exists /// /// The SQL script for creating the table for storing events if it does not exist. @@ -43,7 +43,7 @@ internal abstract class BaseEventRepository(ILogger logger, InboxO provider VARCHAR(50) NOT NULL, event_name VARCHAR(100) NOT NULL, event_path VARCHAR(255), - payload TEXT, + payload JSONB, headers TEXT, additional_data TEXT, naming_policy_type VARCHAR(15), @@ -53,6 +53,34 @@ try_after_at TIMESTAMP(0) NOT NULL, processed_at TIMESTAMP(0) DEFAULT NULL );"; + /// + /// The SQL script for migrating the payload column from text to jsonb type if the column exists and has text type. + /// This is for supporting the old versions of the library which used text type for payload column. + /// + private string MigratePayloadColumnToJsonbScript => $@" + DO $$ + BEGIN + IF EXISTS ( + SELECT 1 FROM information_schema.columns + WHERE table_schema = 'public' + AND table_name = '{TableName}' + AND column_name = 'payload' + AND data_type = 'text' + ) THEN + ALTER TABLE {TableName} ALTER COLUMN payload TYPE JSONB USING payload::jsonb; + END IF; + END + $$;"; + + /// + /// The SQL script for creating indexes for the table. It creates an index for getting unprocessed events and another index for deleting processed events. + /// + private string CreateIndexesScript => $@"CREATE INDEX IF NOT EXISTS idx_for_get_unprocessed_events_of_{TableName} + ON public.{TableName} (processed_at, try_after_at); + + CREATE INDEX IF NOT EXISTS idx_for_delete_processed_events_of_{TableName} + ON public.{TableName} (processed_at);"; + public void CreateTableIfNotExists() { try @@ -60,14 +88,9 @@ public void CreateTableIfNotExists() using var dbConnection = new NpgsqlConnection(_connectionString); dbConnection.Open(); - var createIndexScripts = $@"CREATE INDEX IF NOT EXISTS idx_for_get_unprocessed_events_of_{TableName} - ON public.{TableName} (processed_at, try_after_at); - - CREATE INDEX IF NOT EXISTS idx_for_delete_processed_events_of_{TableName} - ON public.{TableName} (processed_at);"; - dbConnection.Execute(CreateTableSqlScript); - dbConnection.Execute(createIndexScripts); + dbConnection.Execute(MigratePayloadColumnToJsonbScript); + dbConnection.Execute(CreateIndexesScript); } catch (Exception e) { @@ -87,7 +110,7 @@ CREATE INDEX IF NOT EXISTS idx_for_delete_processed_events_of_{TableName} id, provider, event_name, event_path, payload, headers, additional_data, naming_policy_type, created_at, try_count, try_after_at ) VALUES ( - @Id, @Provider, @EventName, @EventPath, @Payload, @Headers, + @Id, @Provider, @EventName, @EventPath, @Payload::jsonb, @Headers, @AdditionalData, @NamingPolicyType, @CreatedAt, @TryCount, @TryAfterAt )"; @@ -188,7 +211,7 @@ public bool BulkInsertEvents(TBaseMessage[] events) protected virtual string SqlQueryToGetUnprocessedEvents => $@" SELECT id as ""{nameof(IBaseMessageBox.Id)}"", provider as ""{nameof(IBaseMessageBox.Provider)}"", event_name as ""{nameof(IBaseMessageBox.EventName)}"", event_path as ""{nameof(IBaseMessageBox.EventPath)}"", - payload as ""{nameof(IBaseMessageBox.Payload)}"", headers as ""{nameof(IBaseMessageBox.Headers)}"", + payload::text as ""{nameof(IBaseMessageBox.Payload)}"", headers as ""{nameof(IBaseMessageBox.Headers)}"", naming_policy_type as ""{nameof(IBaseMessageBox.NamingPolicyType)}"", additional_data as ""{nameof(IBaseMessageBox.AdditionalData)}"", created_at as ""{nameof(IBaseMessageBox.CreatedAt)}"", try_count as ""{nameof(IBaseMessageBox.TryCount)}"", try_after_at as ""{nameof(IBaseMessageBox.TryAfterAt)}"", diff --git a/tests/UnitTests/BaseEventRepositoryTests.cs b/tests/UnitTests/BaseEventRepositoryTests.cs index 8736b59..d02c0e2 100644 --- a/tests/UnitTests/BaseEventRepositoryTests.cs +++ b/tests/UnitTests/BaseEventRepositoryTests.cs @@ -41,7 +41,7 @@ public void InsertEvent_OneItem_EventShouldBeInserted() Provider = "TestProvider", EventName = "TestEvent", EventPath = "/test/path", - Payload = "TestPayload", + Payload = "{}", Headers = "TestHeaders", AdditionalData = "TestAdditionalData", TryCount = 0, @@ -71,7 +71,7 @@ public async Task InsertEventAsync_OneItem_EventShouldBeInserted() Provider = "TestProvider", EventName = "TestEvent", EventPath = "/test/path", - Payload = "TestPayload", + Payload = "{}", Headers = "TestHeaders", AdditionalData = "TestAdditionalData", TryCount = 0, @@ -100,7 +100,7 @@ public void BulkInsertEvents_AddedTwoItems_BothEventsShouldBeInserted() Provider = "TestProvider", EventName = "TestEvent1", EventPath = "/test/path", - Payload = "TestPayload", + Payload = "{}", Headers = "TestHeaders", AdditionalData = "TestAdditionalData", TryCount = 0, @@ -112,7 +112,7 @@ public void BulkInsertEvents_AddedTwoItems_BothEventsShouldBeInserted() Provider = "TestProvider", EventName = "TestEvent2", EventPath = "/test/path", - Payload = "TestPayload", + Payload = "{}", Headers = "TestHeaders", AdditionalData = "TestAdditionalData", TryCount = 0, @@ -144,7 +144,7 @@ public async Task BulkInsertEventsAsync_AddedTwoItems_BothEventsShouldBeInserted Provider = "TestProvider", EventName = "TestEvent1", EventPath = "/test/path", - Payload = "TestPayload", + Payload = "{}", Headers = "TestHeaders", AdditionalData = "TestAdditionalData", TryCount = 0, @@ -156,7 +156,7 @@ public async Task BulkInsertEventsAsync_AddedTwoItems_BothEventsShouldBeInserted Provider = "TestProvider", EventName = "TestEvent2", EventPath = "/test/path", - Payload = "TestPayload", + Payload = "{}", Headers = "TestHeaders", AdditionalData = "TestAdditionalData", NamingPolicyType = NamingPolicyTypeNames.PascalCase, @@ -189,7 +189,7 @@ public async Task GetUnprocessedEventsAsync_TwoItems_ShouldReturnPendingEvents() Provider = "TestProvider1", EventName = "TestEvent1" + typeof(TEvent).FullName, EventPath = "/test/path1", - Payload = "TestPayload1", + Payload = "{}", Headers = "TestHeaders1", AdditionalData = "TestAdditionalData1", TryCount = 0, @@ -202,7 +202,7 @@ public async Task GetUnprocessedEventsAsync_TwoItems_ShouldReturnPendingEvents() Provider = "TestProvider2", EventName = "TestEvent2" + typeof(TEvent).FullName, EventPath = "/test/path2", - Payload = "TestPayload2", + Payload = "{}", Headers = "TestHeaders2", AdditionalData = "TestAdditionalData2", TryCount = 0, @@ -215,7 +215,7 @@ public async Task GetUnprocessedEventsAsync_TwoItems_ShouldReturnPendingEvents() Provider = "TestProvider3", EventName = "TestEvent3" + typeof(TEvent).FullName, EventPath = "/test/path2", - Payload = "TestPayload2", + Payload = "{}", Headers = "TestHeaders2", AdditionalData = "TestAdditionalData2", TryCount = 0, @@ -248,7 +248,7 @@ public async Task UpdateEventAsync_OneItem_ShouldUpdateEvent() Provider = "TestProvider", EventName = "TestEvent", EventPath = "/test/path", - Payload = "TestPayload", + Payload = "{}", Headers = "TestHeaders", AdditionalData = "TestAdditionalData", TryCount = 0 @@ -285,7 +285,7 @@ public async Task UpdateEventsAsync_TwoItems_ShouldUpdateEvents() Provider = "TestProvider1", EventName = "TestEvent1", EventPath = "/test/path1", - Payload = "TestPayload1", + Payload = "{}", Headers = "TestHeaders1", AdditionalData = "TestAdditionalData1", TryCount = 0 @@ -297,7 +297,7 @@ public async Task UpdateEventsAsync_TwoItems_ShouldUpdateEvents() Provider = "TestProvider2", EventName = "TestEvent2", EventPath = "/test/path2", - Payload = "TestPayload2", + Payload = "{}", Headers = "TestHeaders2", AdditionalData = "TestAdditionalData2", TryCount = 0 @@ -343,7 +343,7 @@ public async Task IsEventProcessedAsync_EventExistsAndProcessed_ShouldReturnTrue Provider = "TestProvider", EventName = "ProcessedEvent", EventPath = "/test/path", - Payload = "TestPayload", + Payload = "{}", Headers = "TestHeaders", AdditionalData = "TestAdditionalData", TryCount = 1, @@ -368,7 +368,7 @@ public async Task IsEventProcessedAsync_EventExistsButNotProcessed_ShouldReturnF Provider = "TestProvider", EventName = "UnprocessedEvent", EventPath = "/test/path", - Payload = "TestPayload", + Payload = "{}", Headers = "TestHeaders", AdditionalData = "TestAdditionalData", TryCount = 0, @@ -406,7 +406,7 @@ public async Task DeleteProcessedEventsAsync_OneItems_ShouldDeleteProcessedEvent Provider = "TestProvider1", EventName = "TestEvent1", EventPath = "/test/path1", - Payload = "TestPayload1", + Payload = "{}", Headers = "TestHeaders1", AdditionalData = "TestAdditionalData1", TryCount = 0 @@ -418,7 +418,7 @@ public async Task DeleteProcessedEventsAsync_OneItems_ShouldDeleteProcessedEvent Provider = "TestProvider2", EventName = "TestEvent2", EventPath = "/test/path2", - Payload = "TestPayload2", + Payload = "{}", Headers = "TestHeaders2", AdditionalData = "TestAdditionalData2", TryCount = 0 diff --git a/tests/UnitTests/Inbox/InboxRepositoryTests.cs b/tests/UnitTests/Inbox/InboxRepositoryTests.cs index e41e8d8..392cb00 100644 --- a/tests/UnitTests/Inbox/InboxRepositoryTests.cs +++ b/tests/UnitTests/Inbox/InboxRepositoryTests.cs @@ -31,7 +31,7 @@ public void InsertEvent_PassingNamingPolicyType_StoresEventWithCorrectNamingPoli Provider = "TestProvider", EventName = "TestEvent1", EventPath = "/test/path", - Payload = "TestPayload", + Payload = "{}", Headers = "TestHeaders", AdditionalData = "TestAdditionalData", TryCount = 0,