From 75988f3718b2d1290a7a5dc70ed7ae32abc329d8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Zborek?= Date: Thu, 5 Feb 2026 21:08:48 +0100 Subject: [PATCH 1/4] refactor(csharp): refactor message expiry handling to use TimeSpan and add DurationHelpers --- foreign/csharp/Benchmarks/Program.cs | 6 +- .../Fixtures/FetchMessagesFixture.cs | 6 +- .../Fixtures/FlushMessageFixture.cs | 6 +- .../Fixtures/IggyServerFixture.cs | 3 +- .../Fixtures/OffsetFixtures.cs | 8 +- .../Fixtures/PartitionsFixture.cs | 4 +- .../Fixtures/SendMessageFixture.cs | 4 +- .../Helpers/TopicFactory.cs | 7 +- .../Models/CreateTestTopic.cs | 51 +++++ .../Models/UpdateTestTopic.cs | 27 +++ .../PersonalAccessTokenTests.cs | 7 +- .../StreamsTests.cs | 9 +- .../Iggy_SDK.Tests.Integration/TopicsTests.cs | 24 ++- .../Iggy_SDK/Contracts/TopicResponse.cs | 3 +- .../IggyClient/IIggyPersonalAccessToken.cs | 4 +- .../csharp/Iggy_SDK/IggyClient/IIggyTopic.cs | 8 +- .../Implementations/HttpMessageStream.cs | 22 +- .../Implementations/TcpMessageStream.cs | 14 +- .../JsonConverters/TimeSpanConverter.cs | 42 ++++ .../csharp/Iggy_SDK/Mappers/BinaryMapper.cs | 15 +- .../Publishers/IggyPublisherBuilder.cs | 3 +- .../Publishers/IggyPublisherConfig.cs | 4 +- .../csharp/Iggy_SDK/Utils/DurationHelpers.cs | 60 ++++++ .../UtilityTests/DurationHelperTests.cs | 204 ++++++++++++++++++ 24 files changed, 467 insertions(+), 74 deletions(-) create mode 100644 foreign/csharp/Iggy_SDK.Tests.Integration/Models/CreateTestTopic.cs create mode 100644 foreign/csharp/Iggy_SDK.Tests.Integration/Models/UpdateTestTopic.cs create mode 100644 foreign/csharp/Iggy_SDK/JsonConverters/TimeSpanConverter.cs create mode 100644 foreign/csharp/Iggy_SDK/Utils/DurationHelpers.cs create mode 100644 foreign/csharp/Iggy_SDK_Tests/UtilityTests/DurationHelperTests.cs diff --git a/foreign/csharp/Benchmarks/Program.cs b/foreign/csharp/Benchmarks/Program.cs index bf721fb829..9934d15fe6 100644 --- a/foreign/csharp/Benchmarks/Program.cs +++ b/foreign/csharp/Benchmarks/Program.cs @@ -39,7 +39,7 @@ for (var i = 0; i < producerCount; i++) { - var bus = IggyClientFactory.CreateClient(new IggyClientConfigurator() + var bus = IggyClientFactory.CreateClient(new IggyClientConfigurator { BaseAddress = "127.0.0.1:8090", Protocol = Protocol.Tcp, @@ -67,9 +67,9 @@ await clients[0].CreateStreamAsync($"Test bench stream_{i}"); await clients[0].CreateTopicAsync(Identifier.Numeric(startingStreamId + i), - name: $"Test bench topic_{i}", + $"Test bench topic_{i}", compressionAlgorithm: CompressionAlgorithm.None, - messageExpiry: 0, + messageExpiry: TimeSpan.Zero, maxTopicSize: 2_000_000_000, replicationFactor: 3, partitionsCount: 1); diff --git a/foreign/csharp/Iggy_SDK.Tests.Integration/Fixtures/FetchMessagesFixture.cs b/foreign/csharp/Iggy_SDK.Tests.Integration/Fixtures/FetchMessagesFixture.cs index e58e2742fa..5b2dd71a05 100644 --- a/foreign/csharp/Iggy_SDK.Tests.Integration/Fixtures/FetchMessagesFixture.cs +++ b/foreign/csharp/Iggy_SDK.Tests.Integration/Fixtures/FetchMessagesFixture.cs @@ -16,12 +16,12 @@ // // under the License. using System.Text; -using Apache.Iggy.Contracts.Http; using Apache.Iggy.Enums; using Apache.Iggy.Headers; using Apache.Iggy.IggyClient; using Apache.Iggy.Messages; using Apache.Iggy.Tests.Integrations.Helpers; +using Apache.Iggy.Tests.Integrations.Models; using TUnit.Core.Interfaces; using Partitioning = Apache.Iggy.Kinds.Partitioning; @@ -31,8 +31,8 @@ public class FetchMessagesFixture : IAsyncInitializer { internal readonly int MessageCount = 20; internal readonly string StreamId = "FetchMessagesStream"; - internal readonly CreateTopicRequest TopicHeadersRequest = TopicFactory.CreateTopic("HeadersTopic"); - internal readonly CreateTopicRequest TopicRequest = TopicFactory.CreateTopic("Topic"); + internal readonly CreateTestTopic TopicHeadersRequest = TopicFactory.CreateTopic("HeadersTopic"); + internal readonly CreateTestTopic TopicRequest = TopicFactory.CreateTopic("Topic"); [ClassDataSource(Shared = SharedType.PerAssembly)] public required IggyServerFixture IggyServerFixture { get; init; } diff --git a/foreign/csharp/Iggy_SDK.Tests.Integration/Fixtures/FlushMessageFixture.cs b/foreign/csharp/Iggy_SDK.Tests.Integration/Fixtures/FlushMessageFixture.cs index 4823010366..511a4bcd5a 100644 --- a/foreign/csharp/Iggy_SDK.Tests.Integration/Fixtures/FlushMessageFixture.cs +++ b/foreign/csharp/Iggy_SDK.Tests.Integration/Fixtures/FlushMessageFixture.cs @@ -15,12 +15,11 @@ // // specific language governing permissions and limitations // // under the License. -using Apache.Iggy.Contracts; -using Apache.Iggy.Contracts.Http; using Apache.Iggy.Enums; using Apache.Iggy.IggyClient; using Apache.Iggy.Messages; using Apache.Iggy.Tests.Integrations.Helpers; +using Apache.Iggy.Tests.Integrations.Models; using TUnit.Core.Interfaces; using Partitioning = Apache.Iggy.Kinds.Partitioning; @@ -29,7 +28,7 @@ namespace Apache.Iggy.Tests.Integrations.Fixtures; public class FlushMessageFixture : IAsyncInitializer { internal readonly string StreamId = "FlushMessageStream"; - internal readonly CreateTopicRequest TopicRequest = TopicFactory.CreateTopic("Topic"); + internal readonly CreateTestTopic TopicRequest = TopicFactory.CreateTopic("Topic"); [ClassDataSource(Shared = SharedType.PerAssembly)] public required IggyServerFixture IggyServerFixture { get; init; } @@ -53,7 +52,6 @@ await client.Value.CreateTopicAsync(Identifier.String(StreamId.GetWithProtocol(c }; await client.Value.SendMessagesAsync(Identifier.String(StreamId.GetWithProtocol(client.Key)), Identifier.String(TopicRequest.Name), Partitioning.None(), messages); - } } } diff --git a/foreign/csharp/Iggy_SDK.Tests.Integration/Fixtures/IggyServerFixture.cs b/foreign/csharp/Iggy_SDK.Tests.Integration/Fixtures/IggyServerFixture.cs index 39922d2a2c..1c2fc9b47f 100644 --- a/foreign/csharp/Iggy_SDK.Tests.Integration/Fixtures/IggyServerFixture.cs +++ b/foreign/csharp/Iggy_SDK.Tests.Integration/Fixtures/IggyServerFixture.cs @@ -46,7 +46,8 @@ public class IggyServerFixture : IAsyncInitializer, IAsyncDisposable { "IGGY_ROOT_USERNAME", "iggy" }, { "IGGY_ROOT_PASSWORD", "iggy" }, { "IGGY_TCP_ADDRESS", "0.0.0.0:8090" }, - { "IGGY_HTTP_ADDRESS", "0.0.0.0:3000" } + { "IGGY_HTTP_ADDRESS", "0.0.0.0:3000" }, + { "IGGY_SYSTEM_TOPIC_MESSAGE_EXPIRY", "10m" } }; /// diff --git a/foreign/csharp/Iggy_SDK.Tests.Integration/Fixtures/OffsetFixtures.cs b/foreign/csharp/Iggy_SDK.Tests.Integration/Fixtures/OffsetFixtures.cs index 0867f97ea5..6e33f8cedf 100644 --- a/foreign/csharp/Iggy_SDK.Tests.Integration/Fixtures/OffsetFixtures.cs +++ b/foreign/csharp/Iggy_SDK.Tests.Integration/Fixtures/OffsetFixtures.cs @@ -15,12 +15,11 @@ // // specific language governing permissions and limitations // // under the License. -using Apache.Iggy.Contracts; -using Apache.Iggy.Contracts.Http; using Apache.Iggy.Enums; using Apache.Iggy.IggyClient; using Apache.Iggy.Messages; using Apache.Iggy.Tests.Integrations.Helpers; +using Apache.Iggy.Tests.Integrations.Models; using TUnit.Core.Interfaces; using Partitioning = Apache.Iggy.Kinds.Partitioning; @@ -29,7 +28,7 @@ namespace Apache.Iggy.Tests.Integrations.Fixtures; public class OffsetFixtures : IAsyncInitializer { internal readonly string StreamId = "OffsetStream"; - internal readonly CreateTopicRequest TopicRequest = TopicFactory.CreateTopic("Topic"); + internal readonly CreateTestTopic TopicRequest = TopicFactory.CreateTopic("Topic"); [ClassDataSource(Shared = SharedType.PerAssembly)] public required IggyServerFixture IggyServerFixture { get; init; } @@ -49,8 +48,7 @@ await client.Value.CreateTopicAsync(Identifier.String(StreamId.GetWithProtocol(c { new(Guid.NewGuid(), "Test message 1"u8.ToArray()), new(Guid.NewGuid(), "Test message 2"u8.ToArray()), - new(Guid.NewGuid(), "Test message 3"u8.ToArray()), - new(Guid.NewGuid(), "Test message 4"u8.ToArray()) + new(Guid.NewGuid(), "Test message 3"u8.ToArray()), new(Guid.NewGuid(), "Test message 4"u8.ToArray()) }; await client.Value.SendMessagesAsync(Identifier.String(StreamId.GetWithProtocol(client.Key)), Identifier.String(TopicRequest.Name), Partitioning.None(), messages); diff --git a/foreign/csharp/Iggy_SDK.Tests.Integration/Fixtures/PartitionsFixture.cs b/foreign/csharp/Iggy_SDK.Tests.Integration/Fixtures/PartitionsFixture.cs index 873a7451dd..cdd5fb9a8b 100644 --- a/foreign/csharp/Iggy_SDK.Tests.Integration/Fixtures/PartitionsFixture.cs +++ b/foreign/csharp/Iggy_SDK.Tests.Integration/Fixtures/PartitionsFixture.cs @@ -15,10 +15,10 @@ // // specific language governing permissions and limitations // // under the License. -using Apache.Iggy.Contracts.Http; using Apache.Iggy.Enums; using Apache.Iggy.IggyClient; using Apache.Iggy.Tests.Integrations.Helpers; +using Apache.Iggy.Tests.Integrations.Models; using TUnit.Core.Interfaces; namespace Apache.Iggy.Tests.Integrations.Fixtures; @@ -26,7 +26,7 @@ namespace Apache.Iggy.Tests.Integrations.Fixtures; public class PartitionsFixture : IAsyncInitializer { internal readonly string StreamId = "PartitionsStream"; - internal readonly CreateTopicRequest TopicRequest = TopicFactory.CreateTopic("Topic"); + internal readonly CreateTestTopic TopicRequest = TopicFactory.CreateTopic("Topic"); [ClassDataSource(Shared = SharedType.PerAssembly)] public required IggyServerFixture IggyServerFixture { get; init; } diff --git a/foreign/csharp/Iggy_SDK.Tests.Integration/Fixtures/SendMessageFixture.cs b/foreign/csharp/Iggy_SDK.Tests.Integration/Fixtures/SendMessageFixture.cs index f1347854af..ec7b984995 100644 --- a/foreign/csharp/Iggy_SDK.Tests.Integration/Fixtures/SendMessageFixture.cs +++ b/foreign/csharp/Iggy_SDK.Tests.Integration/Fixtures/SendMessageFixture.cs @@ -15,10 +15,10 @@ // // specific language governing permissions and limitations // // under the License. -using Apache.Iggy.Contracts.Http; using Apache.Iggy.Enums; using Apache.Iggy.IggyClient; using Apache.Iggy.Tests.Integrations.Helpers; +using Apache.Iggy.Tests.Integrations.Models; using TUnit.Core.Interfaces; namespace Apache.Iggy.Tests.Integrations.Fixtures; @@ -26,7 +26,7 @@ namespace Apache.Iggy.Tests.Integrations.Fixtures; public class SendMessageFixture : IAsyncInitializer { internal readonly string StreamId = "SendMessageStream"; - internal readonly CreateTopicRequest TopicRequest = TopicFactory.CreateTopic("Topic"); + internal readonly CreateTestTopic TopicRequest = TopicFactory.CreateTopic("Topic"); [ClassDataSource(Shared = SharedType.PerAssembly)] public required IggyServerFixture IggyServerFixture { get; init; } diff --git a/foreign/csharp/Iggy_SDK.Tests.Integration/Helpers/TopicFactory.cs b/foreign/csharp/Iggy_SDK.Tests.Integration/Helpers/TopicFactory.cs index 5959b7ba6a..52576c80b5 100644 --- a/foreign/csharp/Iggy_SDK.Tests.Integration/Helpers/TopicFactory.cs +++ b/foreign/csharp/Iggy_SDK.Tests.Integration/Helpers/TopicFactory.cs @@ -15,15 +15,16 @@ // // specific language governing permissions and limitations // // under the License. -using Apache.Iggy.Contracts.Http; +using Apache.Iggy.Tests.Integrations.Models; namespace Apache.Iggy.Tests.Integrations.Helpers; public static class TopicFactory { - internal static CreateTopicRequest CreateTopic(string topicId, uint partitionsCount = 1, ulong messageExpiry = 0) + internal static CreateTestTopic CreateTopic(string topicId, uint partitionsCount = 1, + TimeSpan messageExpiry = default) { - return new CreateTopicRequest + return new CreateTestTopic { Name = topicId, PartitionsCount = partitionsCount, diff --git a/foreign/csharp/Iggy_SDK.Tests.Integration/Models/CreateTestTopic.cs b/foreign/csharp/Iggy_SDK.Tests.Integration/Models/CreateTestTopic.cs new file mode 100644 index 0000000000..d58f123e7c --- /dev/null +++ b/foreign/csharp/Iggy_SDK.Tests.Integration/Models/CreateTestTopic.cs @@ -0,0 +1,51 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +using System.Diagnostics.CodeAnalysis; +using Apache.Iggy.Enums; + +namespace Apache.Iggy.Tests.Integrations.Models; + +internal class CreateTestTopic +{ + public required string Name { get; set; } + public CompressionAlgorithm CompressionAlgorithm { get; set; } = CompressionAlgorithm.None; + public TimeSpan MessageExpiry { get; set; } = TimeSpan.Zero; + public uint PartitionsCount { get; set; } = 1; + public byte? ReplicationFactor { get; set; } = 1; + public ulong MaxTopicSize { get; set; } + + public CreateTestTopic() + { + } + + [SetsRequiredMembers] + public CreateTestTopic(string name, + CompressionAlgorithm compressionAlgorithm, + TimeSpan messageExpiry, + uint partitionsCount, + byte? replicationFactor, + ulong maxTopicSize) + { + Name = name; + CompressionAlgorithm = compressionAlgorithm; + MessageExpiry = messageExpiry; + PartitionsCount = partitionsCount; + ReplicationFactor = replicationFactor; + MaxTopicSize = maxTopicSize; + } +} diff --git a/foreign/csharp/Iggy_SDK.Tests.Integration/Models/UpdateTestTopic.cs b/foreign/csharp/Iggy_SDK.Tests.Integration/Models/UpdateTestTopic.cs new file mode 100644 index 0000000000..fbdcb5bb81 --- /dev/null +++ b/foreign/csharp/Iggy_SDK.Tests.Integration/Models/UpdateTestTopic.cs @@ -0,0 +1,27 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +using Apache.Iggy.Enums; + +namespace Apache.Iggy.Tests.Integrations.Models; + +internal record UpdateTestTopic( + string Name, + CompressionAlgorithm CompressionAlgorithm, + ulong MaxTopicSize, + TimeSpan MessageExpiry, + byte? ReplicationFactor); diff --git a/foreign/csharp/Iggy_SDK.Tests.Integration/PersonalAccessTokenTests.cs b/foreign/csharp/Iggy_SDK.Tests.Integration/PersonalAccessTokenTests.cs index ec7d90f749..b2a284c710 100644 --- a/foreign/csharp/Iggy_SDK.Tests.Integration/PersonalAccessTokenTests.cs +++ b/foreign/csharp/Iggy_SDK.Tests.Integration/PersonalAccessTokenTests.cs @@ -18,7 +18,6 @@ using Apache.Iggy.Contracts.Auth; using Apache.Iggy.Enums; using Apache.Iggy.Exceptions; -using Apache.Iggy.Tests.Integrations.Attributes; using Apache.Iggy.Tests.Integrations.Fixtures; using Shouldly; @@ -27,7 +26,7 @@ namespace Apache.Iggy.Tests.Integrations; public class PersonalAccessTokenTests { private const string Name = "test-pat"; - private const ulong Expiry = 100_000_000; + private static readonly TimeSpan Expiry = TimeSpan.FromHours(1); [ClassDataSource(Shared = SharedType.PerClass)] public required PersonalAccessTokenFixture Fixture { get; init; } @@ -64,7 +63,7 @@ IReadOnlyList response response.ShouldNotBeNull(); response.Count.ShouldBe(1); response[0].Name.ShouldBe(Name); - var tokenExpiryDateTimeOffset = DateTimeOffset.UtcNow.AddMicroseconds(Expiry); + var tokenExpiryDateTimeOffset = DateTimeOffset.UtcNow.Add(Expiry); response[0].ExpiryAt!.Value.ToUniversalTime().ShouldBe(tokenExpiryDateTimeOffset, TimeSpan.FromMinutes(1)); } @@ -73,7 +72,7 @@ IReadOnlyList response [MethodDataSource(nameof(IggyServerFixture.ProtocolData))] public async Task LoginWithPersonalAccessToken_Should_Be_Successfully(Protocol protocol) { - var response = await Fixture.Clients[protocol].CreatePersonalAccessTokenAsync("test-pat-login", 100_000_000); + var response = await Fixture.Clients[protocol].CreatePersonalAccessTokenAsync("test-pat-login", Expiry); var client = await Fixture.IggyServerFixture.CreateClient(protocol); diff --git a/foreign/csharp/Iggy_SDK.Tests.Integration/StreamsTests.cs b/foreign/csharp/Iggy_SDK.Tests.Integration/StreamsTests.cs index 32c9b26ed6..6caa0b530c 100644 --- a/foreign/csharp/Iggy_SDK.Tests.Integration/StreamsTests.cs +++ b/foreign/csharp/Iggy_SDK.Tests.Integration/StreamsTests.cs @@ -19,11 +19,9 @@ using Apache.Iggy.Enums; using Apache.Iggy.Exceptions; using Apache.Iggy.Messages; -using Apache.Iggy.Tests.Integrations.Attributes; using Apache.Iggy.Tests.Integrations.Fixtures; using Apache.Iggy.Tests.Integrations.Helpers; using Shouldly; -using TUnit.Core.Logging; using Partitioning = Apache.Iggy.Kinds.Partitioning; namespace Apache.Iggy.Tests.Integrations; @@ -121,8 +119,8 @@ public async Task GetStreams_ByStreamName_Should_ReturnValidResponse(Protocol pr [MethodDataSource(nameof(IggyServerFixture.ProtocolData))] public async Task GetStreamById_WithTopics_Should_ReturnValidResponse(Protocol protocol) { - var topicRequest1 = TopicFactory.CreateTopic("Topic1", messageExpiry: 100_000); - var topicRequest2 = TopicFactory.CreateTopic("Topic2", messageExpiry: 100_000); + var topicRequest1 = TopicFactory.CreateTopic("Topic1", messageExpiry: TimeSpan.FromHours(1)); + var topicRequest2 = TopicFactory.CreateTopic("Topic2", messageExpiry: TimeSpan.FromHours(1)); await Fixture.Clients[protocol].CreateTopicAsync(Identifier.String(Name.GetWithProtocol(protocol)), topicRequest1.Name, topicRequest1.PartitionsCount, messageExpiry: topicRequest1.MessageExpiry); @@ -230,7 +228,8 @@ await Should.NotThrowAsync(() => public async Task DeleteStream_NotExists_Should_Throw_InvalidResponse(Protocol protocol) { await Should.ThrowAsync(() => - Fixture.Clients[protocol].DeleteStreamAsync(Identifier.String("stream-to-delete".GetWithProtocol(protocol)))); + Fixture.Clients[protocol] + .DeleteStreamAsync(Identifier.String("stream-to-delete".GetWithProtocol(protocol)))); } [Test] diff --git a/foreign/csharp/Iggy_SDK.Tests.Integration/TopicsTests.cs b/foreign/csharp/Iggy_SDK.Tests.Integration/TopicsTests.cs index 4bb55b42de..d516e790ee 100644 --- a/foreign/csharp/Iggy_SDK.Tests.Integration/TopicsTests.cs +++ b/foreign/csharp/Iggy_SDK.Tests.Integration/TopicsTests.cs @@ -17,13 +17,12 @@ using System.Text; using Apache.Iggy.Contracts; -using Apache.Iggy.Contracts.Http; using Apache.Iggy.Enums; using Apache.Iggy.Exceptions; using Apache.Iggy.Messages; -using Apache.Iggy.Tests.Integrations.Attributes; using Apache.Iggy.Tests.Integrations.Fixtures; using Apache.Iggy.Tests.Integrations.Helpers; +using Apache.Iggy.Tests.Integrations.Models; using Shouldly; using Partitioning = Apache.Iggy.Kinds.Partitioning; @@ -31,14 +30,14 @@ namespace Apache.Iggy.Tests.Integrations; public class TopicsTests { - private static readonly CreateTopicRequest TopicRequest = new("Test Topic", CompressionAlgorithm.Gzip, 1000, 1, - 2, 2_000_000_000); + private static readonly CreateTestTopic TopicRequest = new("Test Topic", CompressionAlgorithm.Gzip, + TimeSpan.FromMinutes(10), 1, 2, 2_000_000_000); - private static readonly CreateTopicRequest TopicRequestSecond - = new("Test Topic 2", CompressionAlgorithm.Gzip, 1000, 1, 2, 2_000_000_000); + private static readonly CreateTestTopic TopicRequestSecond + = new("Test Topic 2", CompressionAlgorithm.Gzip, TimeSpan.FromMinutes(10), 1, 2, 2_000_000_000); - private static readonly UpdateTopicRequest UpdateTopicRequest - = new("Updated Topic", CompressionAlgorithm.Gzip, 3_000_000_000, 2000, 3); + private static readonly UpdateTestTopic UpdateTopicRequest + = new("Updated Topic", CompressionAlgorithm.Gzip, 3_000_000_000, TimeSpan.FromMinutes(10), 3); [ClassDataSource(Shared = SharedType.PerClass)] public required TopicsFixture Fixture { get; init; } @@ -173,18 +172,21 @@ await Fixture.Clients[protocol].CreateTopicAsync(Identifier.String(Fixture.Strea public async Task Get_Topic_WithPartitions_Should_ReturnValidResponse(Protocol protocol) { await Fixture.Clients[protocol] - .CreatePartitionsAsync(Identifier.String(Fixture.StreamId.GetWithProtocol(protocol)), Identifier.String(TopicRequest.Name), + .CreatePartitionsAsync(Identifier.String(Fixture.StreamId.GetWithProtocol(protocol)), + Identifier.String(TopicRequest.Name), 2); for (var i = 0; i < 3; i++) { await Fixture.Clients[protocol] - .SendMessagesAsync(Identifier.String(Fixture.StreamId.GetWithProtocol(protocol)), Identifier.String(TopicRequest.Name), + .SendMessagesAsync(Identifier.String(Fixture.StreamId.GetWithProtocol(protocol)), + Identifier.String(TopicRequest.Name), Partitioning.None(), GetMessages(i + 2)); } var response = await Fixture.Clients[protocol] - .GetTopicByIdAsync(Identifier.String(Fixture.StreamId.GetWithProtocol(protocol)), Identifier.String(TopicRequest.Name)); + .GetTopicByIdAsync(Identifier.String(Fixture.StreamId.GetWithProtocol(protocol)), + Identifier.String(TopicRequest.Name)); response.ShouldNotBeNull(); response.Id.ShouldBeGreaterThanOrEqualTo(0u); diff --git a/foreign/csharp/Iggy_SDK/Contracts/TopicResponse.cs b/foreign/csharp/Iggy_SDK/Contracts/TopicResponse.cs index ebec03c6be..68297de8ac 100644 --- a/foreign/csharp/Iggy_SDK/Contracts/TopicResponse.cs +++ b/foreign/csharp/Iggy_SDK/Contracts/TopicResponse.cs @@ -57,7 +57,8 @@ public sealed class TopicResponse /// /// Message expiry in milliseconds. /// - public ulong MessageExpiry { get; init; } + [JsonConverter(typeof(TimeSpanConverter))] + public TimeSpan MessageExpiry { get; init; } /// /// Maximum topic size in bytes. diff --git a/foreign/csharp/Iggy_SDK/IggyClient/IIggyPersonalAccessToken.cs b/foreign/csharp/Iggy_SDK/IggyClient/IIggyPersonalAccessToken.cs index 023b02d7d8..54ab7f5fe9 100644 --- a/foreign/csharp/Iggy_SDK/IggyClient/IIggyPersonalAccessToken.cs +++ b/foreign/csharp/Iggy_SDK/IggyClient/IIggyPersonalAccessToken.cs @@ -39,13 +39,13 @@ public interface IIggyPersonalAccessToken /// Creates a new personal access token for the current user. /// /// The name to identify this token. - /// The expiration time in milliseconds from now (optional, null means no expiration). + /// The expiration time from now (optional, null means no expiration). /// The cancellation token to cancel the operation. /// /// A task that represents the asynchronous operation and returns the created personal access token with its /// secret value, or null if creation failed. /// - Task CreatePersonalAccessTokenAsync(string name, ulong? expiry = null, + Task CreatePersonalAccessTokenAsync(string name, TimeSpan? expiry = null, CancellationToken token = default); /// diff --git a/foreign/csharp/Iggy_SDK/IggyClient/IIggyTopic.cs b/foreign/csharp/Iggy_SDK/IggyClient/IIggyTopic.cs index db8884af38..5298d25bdc 100644 --- a/foreign/csharp/Iggy_SDK/IggyClient/IIggyTopic.cs +++ b/foreign/csharp/Iggy_SDK/IggyClient/IIggyTopic.cs @@ -58,7 +58,7 @@ public interface IIggyTopic /// The number of partitions for the topic (max 1000). /// The compression algorithm to use for messages (default: None). /// The replication factor for the topic (optional). - /// The message expiry period in milliseconds (0 = never expire). + /// The message expiry period (0 for server default, MaxValue for never expire). /// The maximum size of the topic in bytes (0 = unlimited). /// The cancellation token to cancel the operation. /// @@ -67,7 +67,7 @@ public interface IIggyTopic /// Task CreateTopicAsync(Identifier streamId, string name, uint partitionsCount, CompressionAlgorithm compressionAlgorithm = CompressionAlgorithm.None, byte? replicationFactor = null, - ulong messageExpiry = 0, ulong maxTopicSize = 0, CancellationToken token = default); + TimeSpan? messageExpiry = null, ulong maxTopicSize = 0, CancellationToken token = default); /// /// Updates the configuration of an existing topic. @@ -81,13 +81,13 @@ public interface IIggyTopic /// The new name for the topic (max 255 characters). /// The new compression algorithm to use (default: None). /// The new maximum size of the topic in bytes (0 = unlimited). - /// The new message expiry period in milliseconds (0 = never expire). + /// The new message expiry period (0 for server default, MaxValue for never expire). /// The new replication factor (optional). /// The cancellation token to cancel the operation. /// A task that represents the asynchronous operation. Task UpdateTopicAsync(Identifier streamId, Identifier topicId, string name, CompressionAlgorithm compressionAlgorithm = CompressionAlgorithm.None, ulong maxTopicSize = 0, - ulong messageExpiry = 0, byte? replicationFactor = null, CancellationToken token = default); + TimeSpan? messageExpiry = null, byte? replicationFactor = null, CancellationToken token = default); /// /// Deletes an existing topic and all its associated messages and partitions. diff --git a/foreign/csharp/Iggy_SDK/IggyClient/Implementations/HttpMessageStream.cs b/foreign/csharp/Iggy_SDK/IggyClient/Implementations/HttpMessageStream.cs index 4e46b47dec..debf2b54bb 100644 --- a/foreign/csharp/Iggy_SDK/IggyClient/Implementations/HttpMessageStream.cs +++ b/foreign/csharp/Iggy_SDK/IggyClient/Implementations/HttpMessageStream.cs @@ -30,6 +30,7 @@ using Apache.Iggy.Kinds; using Apache.Iggy.Messages; using Apache.Iggy.StringHandlers; +using Apache.Iggy.Utils; using Partitioning = Apache.Iggy.Kinds.Partitioning; namespace Apache.Iggy.IggyClient.Implementations; @@ -55,10 +56,7 @@ internal HttpMessageStream(HttpClient httpClient) _jsonSerializerOptions = new JsonSerializerOptions { PropertyNamingPolicy = JsonNamingPolicy.SnakeCaseLower, - Converters = - { - new JsonStringEnumConverter(JsonNamingPolicy.SnakeCaseLower) - } + Converters = { new JsonStringEnumConverter(JsonNamingPolicy.SnakeCaseLower) } }; } @@ -147,15 +145,15 @@ public async Task> GetStreamsAsync(CancellationTok /// public async Task CreateTopicAsync(Identifier streamId, string name, uint partitionsCount, CompressionAlgorithm compressionAlgorithm = CompressionAlgorithm.None, byte? replicationFactor = null, - ulong messageExpiry = 0, ulong maxTopicSize = 0, - CancellationToken token = default) + TimeSpan? messageExpiry = null, ulong maxTopicSize = 0, CancellationToken token = default) { + var messageExpiryValue = (ulong)(messageExpiry?.TotalMicroseconds ?? 0); var json = JsonSerializer.Serialize(new CreateTopicRequest { Name = name, CompressionAlgorithm = compressionAlgorithm, MaxTopicSize = maxTopicSize, - MessageExpiry = messageExpiry, + MessageExpiry = messageExpiryValue, PartitionsCount = partitionsCount, ReplicationFactor = replicationFactor }, _jsonSerializerOptions); @@ -176,11 +174,12 @@ public async Task> GetStreamsAsync(CancellationTok /// public async Task UpdateTopicAsync(Identifier streamId, Identifier topicId, string name, CompressionAlgorithm compressionAlgorithm = CompressionAlgorithm.None, - ulong maxTopicSize = 0, ulong messageExpiry = 0, byte? replicationFactor = null, + ulong maxTopicSize = 0, TimeSpan? messageExpiry = null, byte? replicationFactor = null, CancellationToken token = default) { + var messageExpiryValue = (ulong)(messageExpiry?.TotalMicroseconds ?? 0); var json = JsonSerializer.Serialize( - new UpdateTopicRequest(name, compressionAlgorithm, maxTopicSize, messageExpiry, replicationFactor), + new UpdateTopicRequest(name, compressionAlgorithm, maxTopicSize, messageExpiryValue, replicationFactor), _jsonSerializerOptions); var data = new StringContent(json, Encoding.UTF8, "application/json"); var response = await _httpClient.PutAsync($"/streams/{streamId}/topics/{topicId}", data, token); @@ -708,10 +707,11 @@ public async Task> GetPersonalAccessT } /// - public async Task CreatePersonalAccessTokenAsync(string name, ulong? expiry = null, + public async Task CreatePersonalAccessTokenAsync(string name, TimeSpan? expiry = null, CancellationToken token = default) { - var json = JsonSerializer.Serialize(new CreatePersonalAccessTokenRequest(name, expiry), _jsonSerializerOptions); + var json = JsonSerializer.Serialize( + new CreatePersonalAccessTokenRequest(name, DurationHelpers.ToDuration(expiry)), _jsonSerializerOptions); var content = new StringContent(json, Encoding.UTF8, "application/json"); var response = await _httpClient.PostAsync("/personal-access-tokens", content, token); diff --git a/foreign/csharp/Iggy_SDK/IggyClient/Implementations/TcpMessageStream.cs b/foreign/csharp/Iggy_SDK/IggyClient/Implementations/TcpMessageStream.cs index 3425b1fa06..1d7c665a67 100644 --- a/foreign/csharp/Iggy_SDK/IggyClient/Implementations/TcpMessageStream.cs +++ b/foreign/csharp/Iggy_SDK/IggyClient/Implementations/TcpMessageStream.cs @@ -209,10 +209,11 @@ public async Task> GetTopicsAsync(Identifier stream /// public async Task CreateTopicAsync(Identifier streamId, string name, uint partitionsCount, CompressionAlgorithm compressionAlgorithm = CompressionAlgorithm.None, byte? replicationFactor = null, - ulong messageExpiry = 0, ulong maxTopicSize = 0, CancellationToken token = default) + TimeSpan? messageExpiry = null, ulong maxTopicSize = 0, CancellationToken token = default) { + var messageExpiryValue = DurationHelpers.ToDuration(messageExpiry); var message = TcpContracts.CreateTopic(streamId, name, partitionsCount, compressionAlgorithm, - replicationFactor, messageExpiry, maxTopicSize); + replicationFactor, messageExpiryValue, maxTopicSize); var payload = new byte[4 + BufferSizes.INITIAL_BYTES_LENGTH + message.Length]; TcpMessageStreamHelpers.CreatePayload(payload, message, CommandCodes.CREATE_TOPIC_CODE); @@ -229,11 +230,12 @@ public async Task> GetTopicsAsync(Identifier stream /// public async Task UpdateTopicAsync(Identifier streamId, Identifier topicId, string name, CompressionAlgorithm compressionAlgorithm = CompressionAlgorithm.None, - ulong maxTopicSize = 0, ulong messageExpiry = 0, byte? replicationFactor = null, + ulong maxTopicSize = 0, TimeSpan? messageExpiry = null, byte? replicationFactor = null, CancellationToken token = default) { + var messageExpiryValue = DurationHelpers.ToDuration(messageExpiry); var message = TcpContracts.UpdateTopic(streamId, topicId, name, compressionAlgorithm, maxTopicSize, - messageExpiry, replicationFactor); + messageExpiryValue, replicationFactor); var payload = new byte[4 + BufferSizes.INITIAL_BYTES_LENGTH + message.Length]; TcpMessageStreamHelpers.CreatePayload(payload, message, CommandCodes.UPDATE_TOPIC_CODE); @@ -745,10 +747,10 @@ public async Task> GetPersonalAccessT } /// - public async Task CreatePersonalAccessTokenAsync(string name, ulong? expiry = 0, + public async Task CreatePersonalAccessTokenAsync(string name, TimeSpan? expiry = null, CancellationToken token = default) { - var message = TcpContracts.CreatePersonalAccessToken(name, expiry); + var message = TcpContracts.CreatePersonalAccessToken(name, DurationHelpers.ToDuration(expiry)); var payload = new byte[4 + BufferSizes.INITIAL_BYTES_LENGTH + message.Length]; TcpMessageStreamHelpers.CreatePayload(payload, message, CommandCodes.CREATE_PERSONAL_ACCESS_TOKEN_CODE); diff --git a/foreign/csharp/Iggy_SDK/JsonConverters/TimeSpanConverter.cs b/foreign/csharp/Iggy_SDK/JsonConverters/TimeSpanConverter.cs new file mode 100644 index 0000000000..842ab63cfa --- /dev/null +++ b/foreign/csharp/Iggy_SDK/JsonConverters/TimeSpanConverter.cs @@ -0,0 +1,42 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +using System.Text.Json; +using System.Text.Json.Serialization; +using Apache.Iggy.Utils; + +namespace Apache.Iggy.JsonConverters; + +internal sealed class TimeSpanConverter : JsonConverter +{ + public override TimeSpan Read(ref Utf8JsonReader reader, Type typeToConvert, JsonSerializerOptions options) + { + if (reader.TokenType != JsonTokenType.Number) + { + throw new JsonException("Expected a number token for TimeSpan conversion."); + } + + var microseconds = reader.GetUInt64(); + return DurationHelpers.FromDuration(microseconds); + } + + public override void Write(Utf8JsonWriter writer, TimeSpan value, JsonSerializerOptions options) + { + var microseconds = DurationHelpers.ToDuration(value); + writer.WriteNumberValue(microseconds); + } +} diff --git a/foreign/csharp/Iggy_SDK/Mappers/BinaryMapper.cs b/foreign/csharp/Iggy_SDK/Mappers/BinaryMapper.cs index 576204e4e7..994f9b761e 100644 --- a/foreign/csharp/Iggy_SDK/Mappers/BinaryMapper.cs +++ b/foreign/csharp/Iggy_SDK/Mappers/BinaryMapper.cs @@ -24,6 +24,7 @@ using Apache.Iggy.Extensions; using Apache.Iggy.Headers; using Apache.Iggy.Messages; +using Apache.Iggy.Utils; namespace Apache.Iggy.Mappers; @@ -456,8 +457,16 @@ private static Dictionary MapHeaders(ReadOnlySpan ReadOnlySpan value = payload[position..(position + valueLength)]; position += valueLength; - headers[new HeaderKey { Kind = keyKind, Value = keyValue }] = - new HeaderValue { Kind = valueKind, Value = value.ToArray() }; + headers[new HeaderKey + { + Kind = keyKind, + Value = keyValue + }] = + new HeaderValue + { + Kind = valueKind, + Value = value.ToArray() + }; } return headers; @@ -622,7 +631,7 @@ private static (TopicResponse topic, int readBytes) MapToTopic(ReadOnlySpanThe builder instance for method chaining. public IggyPublisherBuilder CreateTopicIfNotExists(string name, uint topicPartitionsCount = 1, CompressionAlgorithm compressionAlgorithm = CompressionAlgorithm.None, byte? replicationFactor = null, - ulong messageExpiry = 0, ulong maxTopicSize = 0) + TimeSpan messageExpiry = default, ulong maxTopicSize = 0) { Config.CreateTopic = true; Config.TopicName = name; @@ -216,7 +216,6 @@ public IggyPublisherBuilder SubscribeOnMessageBatchFailed(Func /// Configures retry behavior for failed message sends. /// Uses exponential backoff with configurable parameters. diff --git a/foreign/csharp/Iggy_SDK/Publishers/IggyPublisherConfig.cs b/foreign/csharp/Iggy_SDK/Publishers/IggyPublisherConfig.cs index 3419f2c333..b3d0ce921f 100644 --- a/foreign/csharp/Iggy_SDK/Publishers/IggyPublisherConfig.cs +++ b/foreign/csharp/Iggy_SDK/Publishers/IggyPublisherConfig.cs @@ -166,11 +166,11 @@ public class IggyPublisherConfig public byte? TopicReplicationFactor { get; set; } /// - /// Gets or sets the message expiry time in seconds (0 for no expiry). + /// Gets or sets the message expiry time (0 for no expiry). /// Messages older than this will be automatically deleted. /// Only used when is true. /// - public ulong TopicMessageExpiry { get; set; } + public TimeSpan TopicMessageExpiry { get; set; } = TimeSpan.Zero; /// /// Gets or sets the maximum size of the topic in bytes (0 for unlimited). diff --git a/foreign/csharp/Iggy_SDK/Utils/DurationHelpers.cs b/foreign/csharp/Iggy_SDK/Utils/DurationHelpers.cs new file mode 100644 index 0000000000..5f8ec9bcb6 --- /dev/null +++ b/foreign/csharp/Iggy_SDK/Utils/DurationHelpers.cs @@ -0,0 +1,60 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +namespace Apache.Iggy.Utils; + +/// +/// Converts between Iggy durations (ulong microseconds) and .NET TimeSpan. +/// +/// +/// Iggy stores durations (e.g., message expiry) as ulong microseconds, but TimeSpan uses long ticks internally. +/// Since ulong.MaxValue exceeds what TimeSpan can represent from microseconds, we map ulong.MaxValue to +/// TimeSpan.MaxValue and vice versa. +/// +public static class DurationHelpers +{ + /// + /// Converts microseconds to TimeSpan. Returns TimeSpan.MaxValue if duration exceeds representable range. + /// + public static TimeSpan FromDuration(ulong duration) + { + if (duration > long.MaxValue) + { + return TimeSpan.MaxValue; + } + + return TimeSpan.FromMicroseconds(duration); + } + + /// + /// Converts TimeSpan to microseconds. Returns ulong.MaxValue if TimeSpan.MaxValue is passed. + /// + public static ulong ToDuration(TimeSpan? duration) + { + if (duration == null) + { + return 0; + } + + if (duration == TimeSpan.MaxValue) + { + return ulong.MaxValue; + } + + return (ulong)duration.Value.TotalMicroseconds; + } +} diff --git a/foreign/csharp/Iggy_SDK_Tests/UtilityTests/DurationHelperTests.cs b/foreign/csharp/Iggy_SDK_Tests/UtilityTests/DurationHelperTests.cs new file mode 100644 index 0000000000..a5b9709710 --- /dev/null +++ b/foreign/csharp/Iggy_SDK_Tests/UtilityTests/DurationHelperTests.cs @@ -0,0 +1,204 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +using Apache.Iggy.Utils; + +namespace Apache.Iggy.Tests.UtilityTests; + +public sealed class DurationHelperTests +{ + [Fact] + public void FromDuration_Zero_ReturnsZeroTimeSpan() + { + var result = DurationHelpers.FromDuration(0); + + Assert.Equal(TimeSpan.Zero, result); + } + + [Fact] + public void FromDuration_NormalValue_ReturnsCorrectTimeSpan() + { + ulong microseconds = 1_000_000; + var result = DurationHelpers.FromDuration(microseconds); + + Assert.Equal(TimeSpan.FromSeconds(1), result); + } + + [Fact] + public void FromDuration_MillisecondsValue_ReturnsCorrectTimeSpan() + { + ulong microseconds = 5_000; + var result = DurationHelpers.FromDuration(microseconds); + + Assert.Equal(TimeSpan.FromMilliseconds(5), result); + } + + [Fact] + public void FromDuration_LargeValidValue_ReturnsCorrectTimeSpan() + { + var microseconds = (ulong)TimeSpan.MaxValue.TotalMicroseconds; + var result = DurationHelpers.FromDuration(microseconds); + + Assert.Equal(TimeSpan.FromMicroseconds(microseconds), result); + } + + [Fact] + public void FromDuration_ValueExceedsLongMaxValue_ReturnsTimeSpanMaxValue() + { + var microseconds = (ulong)long.MaxValue + 1; + var result = DurationHelpers.FromDuration(microseconds); + + Assert.Equal(TimeSpan.MaxValue, result); + } + + [Fact] + public void FromDuration_UlongMaxValue_ReturnsTimeSpanMaxValue() + { + var result = DurationHelpers.FromDuration(ulong.MaxValue); + + Assert.Equal(TimeSpan.MaxValue, result); + } + + [Fact] + public void ToDuration_Null_ReturnsZero() + { + var result = DurationHelpers.ToDuration(null); + + Assert.Equal(0UL, result); + } + + [Fact] + public void ToDuration_ZeroTimeSpan_ReturnsZero() + { + var result = DurationHelpers.ToDuration(TimeSpan.Zero); + + Assert.Equal(0UL, result); + } + + [Fact] + public void ToDuration_NormalValue_ReturnsCorrectMicroseconds() + { + var duration = TimeSpan.FromSeconds(1); + var result = DurationHelpers.ToDuration(duration); + + Assert.Equal(1_000_000UL, result); + } + + [Fact] + public void ToDuration_MillisecondsValue_ReturnsCorrectMicroseconds() + { + var duration = TimeSpan.FromMilliseconds(5); + var result = DurationHelpers.ToDuration(duration); + + Assert.Equal(5_000UL, result); + } + + [Fact] + public void ToDuration_TimeSpanMaxValue_ReturnsUlongMaxValue() + { + var result = DurationHelpers.ToDuration(TimeSpan.MaxValue); + + Assert.Equal(ulong.MaxValue, result); + } + + [Fact] + public void ToDuration_LargeValue_ReturnsCorrectMicroseconds() + { + var duration = TimeSpan.FromDays(365); + var result = DurationHelpers.ToDuration(duration); + + var expectedMicroseconds = (ulong)(365 * 24 * 60 * 60 * 1_000_000L); + Assert.Equal(expectedMicroseconds, result); + } + + [Fact] + public void RoundTrip_NormalValue_PreservesValue() + { + ulong originalMicroseconds = 1_000_000; + var timeSpan = DurationHelpers.FromDuration(originalMicroseconds); + var result = DurationHelpers.ToDuration(timeSpan); + + Assert.Equal(originalMicroseconds, result); + } + + [Fact] + public void RoundTrip_Zero_PreservesValue() + { + ulong originalMicroseconds = 0; + var timeSpan = DurationHelpers.FromDuration(originalMicroseconds); + var result = DurationHelpers.ToDuration(timeSpan); + + Assert.Equal(originalMicroseconds, result); + } + + [Fact] + public void RoundTrip_UlongMaxValue_PreservesValue() + { + var originalMicroseconds = ulong.MaxValue; + var timeSpan = DurationHelpers.FromDuration(originalMicroseconds); + var result = DurationHelpers.ToDuration(timeSpan); + + Assert.Equal(originalMicroseconds, result); + } + + [Fact] + public void RoundTrip_TimeSpan_PreservesValue() + { + var originalTimeSpan = TimeSpan.FromHours(2); + var microseconds = DurationHelpers.ToDuration(originalTimeSpan); + var result = DurationHelpers.FromDuration(microseconds); + + Assert.Equal(originalTimeSpan, result); + } + + [Fact] + public void RoundTrip_TimeSpanMaxValue_PreservesValue() + { + var originalTimeSpan = TimeSpan.MaxValue; + var microseconds = DurationHelpers.ToDuration(originalTimeSpan); + var result = DurationHelpers.FromDuration(microseconds); + + Assert.Equal(originalTimeSpan, result); + } + + [Theory] + [InlineData(1UL)] + [InlineData(1000UL)] + [InlineData(1_000_000UL)] + [InlineData(60_000_000UL)] + [InlineData(3_600_000_000UL)] + public void FromDuration_VariousValues_ReturnsExpectedTimeSpan(ulong microseconds) + { + var result = DurationHelpers.FromDuration(microseconds); + + Assert.Equal(TimeSpan.FromMicroseconds(microseconds), result); + } + + [Theory] + [InlineData(1)] + [InlineData(100)] + [InlineData(1000)] + [InlineData(60000)] + [InlineData(3600000)] + public void ToDuration_VariousMilliseconds_ReturnsExpectedMicroseconds(int milliseconds) + { + var duration = TimeSpan.FromMilliseconds(milliseconds); + var result = DurationHelpers.ToDuration(duration); + + Assert.Equal((ulong)milliseconds * 1000, result); + } +} From e3086ed71eea5caf23f247805339b143aa0d4776 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Zborek?= Date: Thu, 5 Feb 2026 21:28:53 +0100 Subject: [PATCH 2/4] refactor(csharp): simplify message expiry handling and update documentation --- .../Iggy_SDK/IggyClient/IIggyPersonalAccessToken.cs | 2 +- .../IggyClient/Implementations/HttpMessageStream.cs | 6 ++---- foreign/csharp/Iggy_SDK/Mappers/BinaryMapper.cs | 12 ++---------- .../Iggy_SDK/Publishers/IggyPublisherConfig.cs | 2 +- 4 files changed, 6 insertions(+), 16 deletions(-) diff --git a/foreign/csharp/Iggy_SDK/IggyClient/IIggyPersonalAccessToken.cs b/foreign/csharp/Iggy_SDK/IggyClient/IIggyPersonalAccessToken.cs index 54ab7f5fe9..661c294d91 100644 --- a/foreign/csharp/Iggy_SDK/IggyClient/IIggyPersonalAccessToken.cs +++ b/foreign/csharp/Iggy_SDK/IggyClient/IIggyPersonalAccessToken.cs @@ -39,7 +39,7 @@ public interface IIggyPersonalAccessToken /// Creates a new personal access token for the current user. /// /// The name to identify this token. - /// The expiration time from now (optional, null means no expiration). + /// The expiration time from now (optional, null means server default). /// The cancellation token to cancel the operation. /// /// A task that represents the asynchronous operation and returns the created personal access token with its diff --git a/foreign/csharp/Iggy_SDK/IggyClient/Implementations/HttpMessageStream.cs b/foreign/csharp/Iggy_SDK/IggyClient/Implementations/HttpMessageStream.cs index debf2b54bb..73db25c1db 100644 --- a/foreign/csharp/Iggy_SDK/IggyClient/Implementations/HttpMessageStream.cs +++ b/foreign/csharp/Iggy_SDK/IggyClient/Implementations/HttpMessageStream.cs @@ -147,13 +147,12 @@ public async Task> GetStreamsAsync(CancellationTok CompressionAlgorithm compressionAlgorithm = CompressionAlgorithm.None, byte? replicationFactor = null, TimeSpan? messageExpiry = null, ulong maxTopicSize = 0, CancellationToken token = default) { - var messageExpiryValue = (ulong)(messageExpiry?.TotalMicroseconds ?? 0); var json = JsonSerializer.Serialize(new CreateTopicRequest { Name = name, CompressionAlgorithm = compressionAlgorithm, MaxTopicSize = maxTopicSize, - MessageExpiry = messageExpiryValue, + MessageExpiry = DurationHelpers.ToDuration(messageExpiry), PartitionsCount = partitionsCount, ReplicationFactor = replicationFactor }, _jsonSerializerOptions); @@ -177,9 +176,8 @@ public async Task UpdateTopicAsync(Identifier streamId, Identifier topicId, stri ulong maxTopicSize = 0, TimeSpan? messageExpiry = null, byte? replicationFactor = null, CancellationToken token = default) { - var messageExpiryValue = (ulong)(messageExpiry?.TotalMicroseconds ?? 0); var json = JsonSerializer.Serialize( - new UpdateTopicRequest(name, compressionAlgorithm, maxTopicSize, messageExpiryValue, replicationFactor), + new UpdateTopicRequest(name, compressionAlgorithm, maxTopicSize, DurationHelpers.ToDuration(messageExpiry), replicationFactor), _jsonSerializerOptions); var data = new StringContent(json, Encoding.UTF8, "application/json"); var response = await _httpClient.PutAsync($"/streams/{streamId}/topics/{topicId}", data, token); diff --git a/foreign/csharp/Iggy_SDK/Mappers/BinaryMapper.cs b/foreign/csharp/Iggy_SDK/Mappers/BinaryMapper.cs index 994f9b761e..6c3435d2a5 100644 --- a/foreign/csharp/Iggy_SDK/Mappers/BinaryMapper.cs +++ b/foreign/csharp/Iggy_SDK/Mappers/BinaryMapper.cs @@ -457,16 +457,8 @@ private static Dictionary MapHeaders(ReadOnlySpan ReadOnlySpan value = payload[position..(position + valueLength)]; position += valueLength; - headers[new HeaderKey - { - Kind = keyKind, - Value = keyValue - }] = - new HeaderValue - { - Kind = valueKind, - Value = value.ToArray() - }; + headers[new HeaderKey { Kind = keyKind, Value = keyValue }] = + new HeaderValue { Kind = valueKind, Value = value.ToArray() }; } return headers; diff --git a/foreign/csharp/Iggy_SDK/Publishers/IggyPublisherConfig.cs b/foreign/csharp/Iggy_SDK/Publishers/IggyPublisherConfig.cs index b3d0ce921f..2d44fa7894 100644 --- a/foreign/csharp/Iggy_SDK/Publishers/IggyPublisherConfig.cs +++ b/foreign/csharp/Iggy_SDK/Publishers/IggyPublisherConfig.cs @@ -166,7 +166,7 @@ public class IggyPublisherConfig public byte? TopicReplicationFactor { get; set; } /// - /// Gets or sets the message expiry time (0 for no expiry). + /// Gets or sets the message expiry time (0 for server default, TimeSpan.MaxValue for no expiry). /// Messages older than this will be automatically deleted. /// Only used when is true. /// From 786c9cd8c389c77570bfc82fa078cd0439d5ec62 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Zborek?= Date: Thu, 5 Feb 2026 21:43:49 +0100 Subject: [PATCH 3/4] chore(csharp): update package version to 0.6.3-edge.2 --- foreign/csharp/Iggy_SDK/Iggy_SDK.csproj | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/foreign/csharp/Iggy_SDK/Iggy_SDK.csproj b/foreign/csharp/Iggy_SDK/Iggy_SDK.csproj index 634855ae12..6d794d291c 100644 --- a/foreign/csharp/Iggy_SDK/Iggy_SDK.csproj +++ b/foreign/csharp/Iggy_SDK/Iggy_SDK.csproj @@ -7,7 +7,7 @@ net8.0;net10.0 Apache.Iggy Apache.Iggy - 0.6.3-edge.1 + 0.6.3-edge.2 true From 176de28c9b314202a4a2b2142e6f0a73aff32ead Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Zborek?= Date: Thu, 5 Feb 2026 21:57:22 +0100 Subject: [PATCH 4/4] refactor(csharp): update FromDuration method to improve boundary checks and add unit tests --- .../csharp/Iggy_SDK/Utils/DurationHelpers.cs | 2 +- .../UtilityTests/DurationHelperTests.cs | 27 +++++++++++++++++++ 2 files changed, 28 insertions(+), 1 deletion(-) diff --git a/foreign/csharp/Iggy_SDK/Utils/DurationHelpers.cs b/foreign/csharp/Iggy_SDK/Utils/DurationHelpers.cs index 5f8ec9bcb6..12a19d7c68 100644 --- a/foreign/csharp/Iggy_SDK/Utils/DurationHelpers.cs +++ b/foreign/csharp/Iggy_SDK/Utils/DurationHelpers.cs @@ -32,7 +32,7 @@ public static class DurationHelpers /// public static TimeSpan FromDuration(ulong duration) { - if (duration > long.MaxValue) + if (duration >= long.MaxValue / TimeSpan.TicksPerMicrosecond) { return TimeSpan.MaxValue; } diff --git a/foreign/csharp/Iggy_SDK_Tests/UtilityTests/DurationHelperTests.cs b/foreign/csharp/Iggy_SDK_Tests/UtilityTests/DurationHelperTests.cs index a5b9709710..7e44b7415f 100644 --- a/foreign/csharp/Iggy_SDK_Tests/UtilityTests/DurationHelperTests.cs +++ b/foreign/csharp/Iggy_SDK_Tests/UtilityTests/DurationHelperTests.cs @@ -65,6 +65,33 @@ public void FromDuration_ValueExceedsLongMaxValue_ReturnsTimeSpanMaxValue() Assert.Equal(TimeSpan.MaxValue, result); } + [Fact] + public void FromDuration_ValueLongMaxValueDividedBy100_ReturnsNotTimeSpanMaxValue() + { + var microseconds = (ulong)long.MaxValue / 100; + var result = DurationHelpers.FromDuration(microseconds); + + Assert.NotEqual(TimeSpan.MaxValue, result); + } + + [Fact] + public void FromDuration_ValueLongMaxValueDividedByTicksPerMicrosecond_ReturnsTimeSpanMaxValue() + { + var microseconds = (ulong)long.MaxValue / TimeSpan.TicksPerMicrosecond; + var result = DurationHelpers.FromDuration(microseconds); + + Assert.Equal(TimeSpan.MaxValue, result); + } + + [Fact] + public void FromDuration_ValueLongMaxValue_ReturnsTimeSpanMaxValue() + { + var microseconds = (ulong)long.MaxValue; + var result = DurationHelpers.FromDuration(microseconds); + + Assert.Equal(TimeSpan.MaxValue, result); + } + [Fact] public void FromDuration_UlongMaxValue_ReturnsTimeSpanMaxValue() {