diff --git a/foreign/csharp/Iggy_SDK.Tests.Integration/FetchMessagesTests.cs b/foreign/csharp/Iggy_SDK.Tests.Integration/FetchMessagesTests.cs index 4a32eff1f7..153e7cea6a 100644 --- a/foreign/csharp/Iggy_SDK.Tests.Integration/FetchMessagesTests.cs +++ b/foreign/csharp/Iggy_SDK.Tests.Integration/FetchMessagesTests.cs @@ -139,6 +139,36 @@ public async Task PollMessages_WithHeaders_Should_PollMessages_Successfully(Prot } } + [Test] + [MethodDataSource(nameof(IggyServerFixture.ProtocolData))] + public async Task PollMessages_WithHeaders_Should_AutoParseUserHeadersFromRaw(Protocol protocol) + { + var (client, streamName) = await CreateStreamWithMessages(protocol); + + var response = await client.PollMessagesAsync(new MessageFetchRequest + { + Count = 1, + AutoCommit = true, + Consumer = Consumer.New(2), + PartitionId = 0, + PollingStrategy = PollingStrategy.First(), + StreamId = Identifier.String(streamName), + TopicId = Identifier.String(HeadersTopicName) + }); + + response.Messages.Count.ShouldBe(1); + var msg = response.Messages[0]; + + Dictionary? parsed = msg.UserHeaders; + parsed.ShouldNotBeNull(); + parsed!.Count.ShouldBe(2); + parsed[HeaderKey.FromString("header1")].ToString().ShouldBe("value1"); + parsed[HeaderKey.FromString("header2")].ToInt32().ShouldBe(14); + + Dictionary? second = msg.UserHeaders; + second.ShouldBeSameAs(parsed); + } + private static Message[] CreateMessagesWithoutHeader(int count) { var messages = new List(); diff --git a/foreign/csharp/Iggy_SDK.Tests.Integration/IggyTypedConsumerTests.cs b/foreign/csharp/Iggy_SDK.Tests.Integration/IggyTypedConsumerTests.cs new file mode 100644 index 0000000000..3861ae812f --- /dev/null +++ b/foreign/csharp/Iggy_SDK.Tests.Integration/IggyTypedConsumerTests.cs @@ -0,0 +1,272 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +using System.Text; +using Apache.Iggy.Consumers; +using Apache.Iggy.Enums; +using Apache.Iggy.Exceptions; +using Apache.Iggy.IggyClient; +using Apache.Iggy.Kinds; +using Apache.Iggy.Messages; +using Apache.Iggy.Tests.Integrations.Fixtures; +using Microsoft.Extensions.Logging.Abstractions; +using Shouldly; +using Partitioning = Apache.Iggy.Kinds.Partitioning; + +namespace Apache.Iggy.Tests.Integrations; + +public class IggyTypedConsumerTests +{ + [ClassDataSource(Shared = SharedType.PerAssembly)] + public required IggyServerFixture Fixture { get; init; } + + [Test] + [MethodDataSource(nameof(IggyServerFixture.ProtocolData))] + public async Task ReceiveDeserializedAsync_Should_YieldMessages_WithCorrectData(Protocol protocol) + { + var client = protocol == Protocol.Tcp + ? await Fixture.CreateTcpClient() + : await Fixture.CreateHttpClient(); + + var testStream = await CreateTestStreamWithMessages(client, protocol); + + var consumerId = protocol == Protocol.Tcp ? 200 : 300; + IggyConsumer consumer = BuildTypedConsumer(client, testStream, Consumer.New(consumerId), + AutoCommitMode.Disabled, new Utf8StringDeserializer()); + + await consumer.InitAsync(); + + var received = new List>(); + var cts = new CancellationTokenSource(TimeSpan.FromSeconds(30)); + + await foreach (ReceivedMessage msg in consumer.ReceiveDeserializedAsync(cts.Token)) + { + msg.ShouldNotBeNull(); + msg.Status.ShouldBe(MessageStatus.Success); + msg.Data.ShouldNotBeNull(); + msg.Data.ShouldStartWith("Test message"); + msg.PartitionId.ShouldBe(1u); + msg.Error.ShouldBeNull(); + received.Add(msg); + if (received.Count >= 10) + { + break; + } + } + + received.Count.ShouldBeGreaterThanOrEqualTo(10); + await consumer.DisposeAsync(); + } + + [Test] + [MethodDataSource(nameof(IggyServerFixture.ProtocolData))] + public async Task ReceiveDeserializedAsync_WithoutInit_Should_Throw_ConsumerNotInitializedException( + Protocol protocol) + { + var client = protocol == Protocol.Tcp + ? await Fixture.CreateTcpClient() + : await Fixture.CreateHttpClient(); + + var testStream = await CreateTestStreamWithMessages(client, protocol); + + var consumerId = protocol == Protocol.Tcp ? 201 : 301; + IggyConsumer consumer = BuildTypedConsumer(client, testStream, Consumer.New(consumerId), + AutoCommitMode.Disabled, new Utf8StringDeserializer()); + + await Should.ThrowAsync(async () => + { + await foreach (ReceivedMessage _ in consumer.ReceiveDeserializedAsync()) + { + } + }); + + await consumer.DisposeAsync(); + } + + [Test] + [MethodDataSource(nameof(IggyServerFixture.ProtocolData))] + public async Task ReceiveDeserializedAsync_WithAutoCommitAfterReceive_Should_StoreOffset(Protocol protocol) + { + var client = protocol == Protocol.Tcp + ? await Fixture.CreateTcpClient() + : await Fixture.CreateHttpClient(); + + var testStream = await CreateTestStreamWithMessages(client, protocol); + + var consumerId = protocol == Protocol.Tcp ? 202 : 302; + IggyConsumer consumer = BuildTypedConsumer(client, testStream, Consumer.New(consumerId), + AutoCommitMode.AfterReceive, new Utf8StringDeserializer(), + PollingStrategy.First()); + + await consumer.InitAsync(); + + var cts = new CancellationTokenSource(TimeSpan.FromSeconds(30)); + var count = 0; + + await foreach (ReceivedMessage _ in consumer.ReceiveDeserializedAsync(cts.Token)) + { + count++; + if (count >= 5) + { + break; + } + } + + await consumer.DisposeAsync(); + + var offset = await client.GetOffsetAsync(Consumer.New(consumerId), + Identifier.String(testStream.StreamId), + Identifier.String(testStream.TopicId), + 1u); + + offset.ShouldNotBeNull(); + offset.StoredOffset.ShouldBe(3ul); + } + + [Test] + [MethodDataSource(nameof(IggyServerFixture.ProtocolData))] + public async Task ReceiveDeserializedAsync_WithFailingDeserializer_Should_YieldDeserializationFailed( + Protocol protocol) + { + var client = protocol == Protocol.Tcp + ? await Fixture.CreateTcpClient() + : await Fixture.CreateHttpClient(); + + var testStream = await CreateTestStreamWithMessages(client, protocol); + + var consumerId = protocol == Protocol.Tcp ? 203 : 303; + IggyConsumer consumer = BuildTypedConsumer(client, testStream, Consumer.New(consumerId), + AutoCommitMode.Disabled, new FailingDeserializer()); + + await consumer.InitAsync(); + + var cts = new CancellationTokenSource(TimeSpan.FromSeconds(30)); + + await foreach (ReceivedMessage msg in consumer.ReceiveDeserializedAsync(cts.Token)) + { + msg.Status.ShouldBe(MessageStatus.DeserializationFailed); + msg.Data.ShouldBeNull(); + msg.Error.ShouldNotBeNull(); + msg.Error.ShouldBeOfType(); + break; + } + + await consumer.DisposeAsync(); + } + + [Test] + [MethodDataSource(nameof(IggyServerFixture.ProtocolData))] + public async Task ReceiveDeserializedAsync_Should_StopCleanly_OnCancellation(Protocol protocol) + { + var client = protocol == Protocol.Tcp + ? await Fixture.CreateTcpClient() + : await Fixture.CreateHttpClient(); + + var testStream = await CreateTestStreamWithMessages(client, protocol); + + var consumerId = protocol == Protocol.Tcp ? 204 : 304; + IggyConsumer consumer = BuildTypedConsumer(client, testStream, Consumer.New(consumerId), + AutoCommitMode.Disabled, new Utf8StringDeserializer()); + + await consumer.InitAsync(); + + using var cts = new CancellationTokenSource(TimeSpan.FromMilliseconds(500)); + + await Should.NotThrowAsync(async () => + { + try + { + await foreach (ReceivedMessage _ in consumer.ReceiveDeserializedAsync(cts.Token)) + { + } + } + catch (OperationCanceledException) + { + } + }); + + await consumer.DisposeAsync(); + } + + private static IggyConsumer BuildTypedConsumer(IIggyClient client, + TestStreamInfo stream, + Consumer consumer, + AutoCommitMode autoCommitMode, + IDeserializer deserializer, + PollingStrategy? pollingStrategy = null) + { + var config = new IggyConsumerConfig + { + StreamId = Identifier.String(stream.StreamId), + TopicId = Identifier.String(stream.TopicId), + Consumer = consumer, + PollingStrategy = pollingStrategy ?? PollingStrategy.Next(), + BatchSize = 10, + PartitionId = 1, + AutoCommitMode = autoCommitMode, + AutoCommit = autoCommitMode != AutoCommitMode.Disabled, + PollingIntervalMs = 0, + Deserializer = deserializer + }; + return new IggyConsumer(client, config, NullLoggerFactory.Instance); + } + + private async Task CreateTestStreamWithMessages(IIggyClient client, Protocol protocol, + uint partitionsCount = 5, int messagesPerPartition = 100) + { + var streamId = $"typed_stream_{Guid.NewGuid()}_{protocol.ToString().ToLowerInvariant()}"; + var topicId = "test_topic"; + + await client.CreateStreamAsync(streamId); + await client.CreateTopicAsync(Identifier.String(streamId), topicId, partitionsCount); + + for (uint partitionId = 0; partitionId < partitionsCount; partitionId++) + { + var messages = new List(); + for (var i = 0; i < messagesPerPartition; i++) + { + messages.Add(new Message(Guid.NewGuid(), + Encoding.UTF8.GetBytes($"Test message {i} for partition {partitionId}"))); + } + + await client.SendMessagesAsync(Identifier.String(streamId), + Identifier.String(topicId), + Partitioning.PartitionId((int)partitionId), + messages); + } + + return new TestStreamInfo(streamId, topicId, partitionsCount, messagesPerPartition); + } + + private record TestStreamInfo(string StreamId, string TopicId, uint PartitionsCount, int MessagesPerPartition); + + private sealed class Utf8StringDeserializer : IDeserializer + { + public string Deserialize(ReadOnlyMemory data) + { + return Encoding.UTF8.GetString(data.Span); + } + } + + private sealed class FailingDeserializer : IDeserializer + { + public string Deserialize(ReadOnlyMemory data) + { + throw new InvalidOperationException("Intentional deserialization failure"); + } + } +} diff --git a/foreign/csharp/Iggy_SDK.sln b/foreign/csharp/Iggy_SDK.sln index 5732f69782..4e1d81c01a 100644 --- a/foreign/csharp/Iggy_SDK.sln +++ b/foreign/csharp/Iggy_SDK.sln @@ -1,4 +1,4 @@ - + Microsoft Visual Studio Solution File, Format Version 12.00 Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Iggy_SDK", "Iggy_SDK\Iggy_SDK.csproj", "{661540EB-81F9-492C-828B-CF787BEBD50B}" EndProject diff --git a/foreign/csharp/Iggy_SDK/Consumers/IDeserializer.cs b/foreign/csharp/Iggy_SDK/Consumers/IDeserializer.cs index 4a82d11082..041b75cb3c 100644 --- a/foreign/csharp/Iggy_SDK/Consumers/IDeserializer.cs +++ b/foreign/csharp/Iggy_SDK/Consumers/IDeserializer.cs @@ -21,7 +21,8 @@ namespace Apache.Iggy.Consumers; /// Interface for deserializing message payloads from byte arrays to type T. /// /// No type constraints are enforced on T to provide maximum flexibility. -/// Implementations are responsible for ensuring that the provided byte data can be properly deserialized to the target type. +/// Implementations are responsible for ensuring that the provided byte data can be properly deserialized to the +/// target type. /// /// /// @@ -37,18 +38,13 @@ namespace Apache.Iggy.Consumers; public interface IDeserializer { /// - /// Deserializes a byte array into an instance of type T. + /// Deserializes a read-only memory into an instance of type T. Callers may pass a byte[] directly + /// thanks to the implicit conversion to . /// - /// The byte array containing the serialized data to deserialize. + /// + /// Read-only memory containing the serialized data. The implementation MUST NOT retain a reference to + /// the span after returning. + /// /// An instance of type T representing the deserialized data. - /// - /// Thrown when the data format is invalid and cannot be deserialized. - /// - /// - /// Thrown when the data cannot be deserialized due to invalid content or structure. - /// - /// - /// Thrown when the deserialization operation fails due to state issues. - /// - T Deserialize(byte[] data); + T Deserialize(ReadOnlyMemory data); } diff --git a/foreign/csharp/Iggy_SDK/Consumers/IggyConsumer.Rented.cs b/foreign/csharp/Iggy_SDK/Consumers/IggyConsumer.Rented.cs new file mode 100644 index 0000000000..e3d35083fd --- /dev/null +++ b/foreign/csharp/Iggy_SDK/Consumers/IggyConsumer.Rented.cs @@ -0,0 +1,249 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +using System.Runtime.CompilerServices; +using System.Threading.Channels; +using Apache.Iggy.Contracts; +using Apache.Iggy.Enums; +using Apache.Iggy.Exceptions; +using Apache.Iggy.Headers; +using Apache.Iggy.Kinds; +using Apache.Iggy.Mappers; +using Microsoft.Extensions.Logging; + +namespace Apache.Iggy.Consumers; + +public partial class IggyConsumer +{ + /// + /// Receives messages asynchronously as an async stream of rented messages. Each yielded + /// shares its underlying pooled buffer with the other messages from the + /// same poll and MUST be disposed by the caller when processing is complete. The buffer is returned to the + /// pool once every message of its batch has been disposed. + /// + /// Cancellation token to stop receiving messages. + /// An async enumerable of rented messages. + /// Thrown when has not been called. + public async IAsyncEnumerable ReceiveRentedAsync( + [EnumeratorCancellation] CancellationToken ct = default) + { + if (!_isInitialized) + { + throw new ConsumerNotInitializedException(); + } + + do + { + if (!_rentedChannel.Reader.TryRead(out var message)) + { + await PollRentedMessagesAsync(ct); + continue; + } + + yield return message; + + if (_config.AutoCommitMode == AutoCommitMode.AfterReceive) + { + await StoreOffsetAsync(message.CurrentOffset, message.PartitionId, false, ct); + } + } while (!ct.IsCancellationRequested); + } + + /// + /// Publishes a single rented message from a polled batch to the consumer channel. Called once per + /// message during . The caller acquires a reference on + /// before invocation; the channel reader is expected to release that + /// reference by disposing the produced . Override to redirect + /// rented batches to a different sink (e.g. typed deserialization) — overrides must ensure the + /// acquired reference is released exactly once on every path. + /// + /// Reference-counted handle around the polled batch. Caller has already acquired one reference. + /// + /// The rented message to publish. Payload and raw headers are slices of pooled memory tied to + /// . + /// + /// Partition the message was polled from. + /// Outcome of any prior processing (e.g. decryption). + /// Exception captured if is non-success; otherwise null. + /// Cancellation token. + protected virtual async Task PublishRentedAsync(RentedBatchHandle rental, + RentedMessageResponse message, + uint partitionId, + MessageStatus status, + Exception? error, + CancellationToken ct) + { + await _rentedChannel.Writer.WriteAsync(new ReceivedRentedMessage + { + Handle = rental, + Message = message, + CurrentOffset = message.Header.Offset, + PartitionId = partitionId, + Status = status, + Error = error + }, ct); + } + + /// + /// Polls a rented batch from the server and publishes it via . + /// Handles decryption, offset tracking, and auto-commit logic. Rental lifetime is managed via + /// shared by every produced message. + /// + protected async Task PollRentedMessagesAsync(CancellationToken ct) + { + if (!_joinedConsumerGroup) + { + LogConsumerGroupNotJoinedYetSkippingPolling(); + return; + } + + await _pollingSemaphore.WaitAsync(ct); + + PolledMessagesRental? rental = null; + RentedBatchHandle? batchHandle = null; + try + { + if (_config.PollingIntervalMs > 0) + { + await WaitBeforePollingAsync(ct); + } + + rental = await _client.PollMessagesRentedAsync(_config.StreamId, _config.TopicId, + _config.PartitionId, _config.Consumer, _config.PollingStrategy, _config.BatchSize, + _config.AutoCommit, ct); + + if (rental.Messages.Count == 0) + { + if (_logger.IsEnabled(LogLevel.Debug)) + { + _logger.LogDebug("No messages received from poll for partition {PartitionId}", rental.PartitionId); + } + + return; + } + + var partitionId = (uint)rental.PartitionId; + + var hasLastOffset = _lastPolledOffset.TryGetValue(rental.PartitionId, out var lastPolledPartitionOffset); + + var currentOffset = 0ul; + + batchHandle = new RentedBatchHandle(rental); + var anyNewMessages = false; + foreach (var message in rental.Messages) + { + if (hasLastOffset && message.Header.Offset <= lastPolledPartitionOffset) + { + continue; + } + + var processedMessage = message; + var status = MessageStatus.Success; + Exception? error = null; + + // TODO: fix encryption allocations by moving it to IggyClient + if (_config.MessageEncryptor != null) + { + try + { + var decryptedPayload = _config.MessageEncryptor.Decrypt(message.Payload.ToArray()); + + Dictionary? decryptedHeaders = null; + if (!message.RawUserHeaders.IsEmpty) + { + var decryptedHeaderBytes = + _config.MessageEncryptor.Decrypt(message.RawUserHeaders.ToArray()); + decryptedHeaders = BinaryMapper.MapHeaders(decryptedHeaderBytes); + } + + processedMessage = new RentedMessageResponse + { + Header = message.Header, + Payload = decryptedPayload, + RawUserHeaders = ReadOnlyMemory.Empty, + UserHeaders = decryptedHeaders + }; + } + catch (Exception ex) + { + LogFailedToDecryptMessage(ex, message.Header.Offset); + status = MessageStatus.DecryptionFailed; + error = ex; + } + } + + currentOffset = message.Header.Offset; + batchHandle.Acquire(); + try + { + await PublishRentedAsync(batchHandle, processedMessage, partitionId, status, error, ct); + } + catch + { + batchHandle.Release(); + throw; + } + + anyNewMessages = true; + } + + if (!anyNewMessages + && _config.AutoCommitMode != AutoCommitMode.Disabled) + { + if (_logger.IsEnabled(LogLevel.Debug)) + { + _logger.LogDebug("No new messages found, committing offset {Offset} for partition {PartitionId}", + lastPolledPartitionOffset, rental.PartitionId); + } + + await StoreOffsetAsync(lastPolledPartitionOffset, partitionId, false, ct); + } + + if (anyNewMessages) + { + _lastPolledOffset.AddOrUpdate(rental.PartitionId, currentOffset, (_, _) => currentOffset); + } + + if (_config.PollingStrategy.Kind == MessagePolling.Offset) + { + _config.PollingStrategy = PollingStrategy.Offset(currentOffset + 1); + } + } + catch (OperationCanceledException) when (ct.IsCancellationRequested) + { + throw; + } + catch (Exception ex) + { + LogFailedToPollMessages(ex); + _consumerErrorEvents.Publish(new ConsumerErrorEventArgs(ex, "Failed to poll messages")); + } + finally + { + if (batchHandle is not null) + { + batchHandle.Release(); + } + else + { + rental?.Dispose(); + } + + _pollingSemaphore.Release(); + } + } +} diff --git a/foreign/csharp/Iggy_SDK/Consumers/IggyConsumer.cs b/foreign/csharp/Iggy_SDK/Consumers/IggyConsumer.cs index 76264f988c..e76e8133f2 100644 --- a/foreign/csharp/Iggy_SDK/Consumers/IggyConsumer.cs +++ b/foreign/csharp/Iggy_SDK/Consumers/IggyConsumer.cs @@ -37,6 +37,7 @@ namespace Apache.Iggy.Consumers; public partial class IggyConsumer : IAsyncDisposable { private readonly Channel _channel; + private readonly Channel _rentedChannel; private readonly IIggyClient _client; private readonly IggyConsumerConfig _config; private readonly SemaphoreSlim _connectionStateSemaphore = new(1, 1); @@ -50,6 +51,9 @@ public partial class IggyConsumer : IAsyncDisposable private volatile bool _joinedConsumerGroup; private long _lastPolledAtMs; + /// Whether this consumer has been initialized via . + protected bool IsInitialized => _isInitialized; + /// /// Initializes a new instance of the class /// @@ -62,7 +66,9 @@ public IggyConsumer(IIggyClient client, IggyConsumerConfig config, ILoggerFactor _config = config; _logger = loggerFactory.CreateLogger(); - _channel = Channel.CreateUnbounded(); + _channel = Channel.CreateBounded(new BoundedChannelOptions((int)config.BatchSize * 2)); + _rentedChannel + = Channel.CreateBounded(new BoundedChannelOptions((int)config.BatchSize * 2)); _consumerErrorEvents = new EventAggregator(loggerFactory); } @@ -112,7 +118,6 @@ await _client.LeaveConsumerGroupAsync(_config.StreamId, _config.TopicId, _consumerErrorEvents.Clear(); _pollingSemaphore.Dispose(); _connectionStateSemaphore.Dispose(); - } /// @@ -187,7 +192,7 @@ public async IAsyncEnumerable ReceiveAsync([EnumeratorCancellat if (_config.AutoCommitMode == AutoCommitMode.AfterReceive) { - await StoreOffsetAsync(message.CurrentOffset, message.PartitionId, ct); + await StoreOffsetAsync(message.CurrentOffset, message.PartitionId, false, ct); } } while (!ct.IsCancellationRequested); } @@ -198,10 +203,17 @@ public async IAsyncEnumerable ReceiveAsync([EnumeratorCancellat /// /// The offset to store /// The partition ID + /// /// Cancellation token - public async Task StoreOffsetAsync(ulong offset, uint partitionId, CancellationToken ct = default) + public async Task StoreOffsetAsync(ulong offset, uint partitionId, bool resetLastPolled = false, + CancellationToken ct = default) { await _client.StoreOffsetAsync(_config.Consumer, _config.StreamId, _config.TopicId, offset, partitionId, ct); + + if (resetLastPolled) + { + _lastPolledOffset[(int)partitionId] = offset; + } } /// @@ -349,30 +361,23 @@ private async Task PollMessagesAsync(CancellationToken ct) _config.PartitionId, _config.Consumer, _config.PollingStrategy, _config.BatchSize, _config.AutoCommit, ct); - var receiveMessages = messages.Messages.Count > 0; - - if (_lastPolledOffset.TryGetValue(messages.PartitionId, out var lastPolledPartitionOffset)) - { - messages.Messages = messages.Messages.Where(x => x.Header.Offset > lastPolledPartitionOffset).ToList(); - } - - if (messages.Messages.Count == 0 - && receiveMessages - && _config.AutoCommitMode != AutoCommitMode.Disabled) - { - _logger.LogDebug("No new messages found, committing offset {Offset} for partition {PartitionId}", - lastPolledPartitionOffset, messages.PartitionId); - await StoreOffsetAsync(lastPolledPartitionOffset, (uint)messages.PartitionId, ct); - } - if (messages.Messages.Count == 0) { return; } + var hasLastOffset = _lastPolledOffset.TryGetValue(messages.PartitionId, + out var lastPolledPartitionOffset); + var currentOffset = 0ul; + var anyNewMessages = false; foreach (var message in messages.Messages) { + if (hasLastOffset && message.Header.Offset <= lastPolledPartitionOffset) + { + continue; + } + var processedMessage = message; var status = MessageStatus.Success; Exception? error = null; @@ -416,6 +421,19 @@ private async Task PollMessagesAsync(CancellationToken ct) await _channel.Writer.WriteAsync(receivedMessage, ct); currentOffset = receivedMessage.CurrentOffset; + anyNewMessages = true; + } + + if (!anyNewMessages) + { + if (_config.AutoCommitMode != AutoCommitMode.Disabled) + { + _logger.LogDebug("No new messages found, committing offset {Offset} for partition {PartitionId}", + lastPolledPartitionOffset, messages.PartitionId); + await StoreOffsetAsync(lastPolledPartitionOffset, (uint)messages.PartitionId, false, ct); + } + + return; } _lastPolledOffset.AddOrUpdate(messages.PartitionId, currentOffset, diff --git a/foreign/csharp/Iggy_SDK/Consumers/IggyConsumerOfT.cs b/foreign/csharp/Iggy_SDK/Consumers/IggyConsumerOfT.cs index afaa8cd94b..5f9126169e 100644 --- a/foreign/csharp/Iggy_SDK/Consumers/IggyConsumerOfT.cs +++ b/foreign/csharp/Iggy_SDK/Consumers/IggyConsumerOfT.cs @@ -16,6 +16,9 @@ // under the License. using System.Runtime.CompilerServices; +using System.Threading.Channels; +using Apache.Iggy.Contracts; +using Apache.Iggy.Exceptions; using Apache.Iggy.IggyClient; using Microsoft.Extensions.Logging; @@ -28,6 +31,9 @@ namespace Apache.Iggy.Consumers; /// The type to deserialize message payloads to public class IggyConsumer : IggyConsumer { + private readonly Channel> _deserializedChannel = + Channel.CreateUnbounded>(); + private readonly IggyConsumerConfig _typedConfig; private readonly ILogger> _typedLogger; @@ -37,70 +43,99 @@ public class IggyConsumer : IggyConsumer /// The Iggy client for server communication /// Typed consumer configuration including deserializer /// Logger instance for diagnostic output - public IggyConsumer(IIggyClient client, IggyConsumerConfig config, ILoggerFactory logger) : base( - client, config, logger) + public IggyConsumer(IIggyClient client, IggyConsumerConfig config, ILoggerFactory logger) : base(client, config, + logger) { _typedConfig = config; _typedLogger = logger.CreateLogger>(); } /// - /// Receives and deserializes messages from the consumer + /// Receives and deserializes messages via the rented poll path. Each polled batch is deserialized + /// in full before any message is yielded — the rented buffer is returned immediately after + /// deserialization, independently of how fast the caller iterates. The caller does not need to + /// dispose anything. /// - /// Cancellation token - /// Async enumerable of deserialized messages with status + /// Cancellation token. + /// Async enumerable of deserialized messages with status. public async IAsyncEnumerable> ReceiveDeserializedAsync( [EnumeratorCancellation] CancellationToken ct = default) { - await foreach (var message in ReceiveAsync(ct)) + if (!IsInitialized) { - if (message.Status != MessageStatus.Success) + throw new ConsumerNotInitializedException(); + } + + do + { + if (!_deserializedChannel.Reader.TryRead(out ReceivedMessage? message)) { - yield return new ReceivedMessage - { - Data = default, - Message = message.Message, - CurrentOffset = message.CurrentOffset, - PartitionId = message.PartitionId, - Status = message.Status, - Error = message.Error - }; + await PollRentedMessagesAsync(ct); continue; } - T? deserializedPayload = default; - Exception? deserializationError = null; - var status = MessageStatus.Success; + yield return message; + + if (_typedConfig.AutoCommitMode == AutoCommitMode.AfterReceive) + { + await StoreOffsetAsync(message.Header.Offset, message.PartitionId, false, ct); + } + } while (!ct.IsCancellationRequested); + } + /// + /// Overrides the base batch-publishing step: instead of routing rented messages through the base + /// class channel, deserializes the entire batch immediately (releasing all rented buffer refs via + /// ) and writes the deserialized results to + /// . Auto-commit is also handled here since the base-class + /// yield path is bypassed. + /// + protected override async Task PublishRentedAsync(RentedBatchHandle rental, RentedMessageResponse message, + uint partitionId, MessageStatus status, + Exception? error, CancellationToken ct) + { + T? data = default; + var deserError = status != MessageStatus.Success ? error : null; + var msgStatus = status; + + if (status == MessageStatus.Success) + { try { - deserializedPayload = Deserialize(message.Message.Payload); + data = Deserialize(message.Payload); } catch (Exception ex) { - _typedLogger.LogError(ex, "Failed to deserialize message at offset {Offset}", message.CurrentOffset); - status = MessageStatus.DeserializationFailed; - deserializationError = ex; + _typedLogger.LogError(ex, "Failed to deserialize message at offset {Offset}", + message.Header.Offset); + msgStatus = MessageStatus.DeserializationFailed; + deserError = ex; } - - yield return new ReceivedMessage - { - Data = deserializedPayload, - Message = message.Message, - CurrentOffset = message.CurrentOffset, - PartitionId = message.PartitionId, - Status = status, - Error = deserializationError - }; } + + var deserialized = new ReceivedMessage + { + Data = data, + Header = message.Header, + UserHeaders = message.UserHeaders, + CurrentOffset = message.Header.Offset, + PartitionId = partitionId, + Status = msgStatus, + Error = deserError + }; + + await _deserializedChannel.Writer.WriteAsync(deserialized, ct); + + rental.Release(); } /// - /// Deserializes a message payload using the configured deserializer + /// Deserializes a message payload from a span using the configured deserializer. Zero-copy when the + /// deserializer overrides the span overload; otherwise falls back to a one-time array copy. /// - /// The raw byte array payload to deserialize - /// The deserialized object of type T - public T Deserialize(byte[] payload) + /// The payload memory to deserialize. + /// The deserialized object of type T. + public T Deserialize(ReadOnlyMemory payload) { return _typedConfig.Deserializer.Deserialize(payload); } diff --git a/foreign/csharp/Iggy_SDK/Consumers/ReceivedMessage.cs b/foreign/csharp/Iggy_SDK/Consumers/ReceivedMessage.cs index 9c5160cc2d..24213821c0 100644 --- a/foreign/csharp/Iggy_SDK/Consumers/ReceivedMessage.cs +++ b/foreign/csharp/Iggy_SDK/Consumers/ReceivedMessage.cs @@ -16,19 +16,53 @@ // under the License. using Apache.Iggy.Contracts; +using Apache.Iggy.Headers; +using Apache.Iggy.Messages; namespace Apache.Iggy.Consumers; /// -/// Represents a message received from the Iggy consumer with a deserialized payload of type T +/// Represents a message whose payload was deserialized directly from rented memory. The rented buffer has +/// already been returned to the pool by the time this message is yielded, so only the header, user headers, +/// and the deserialized are available; the raw payload bytes are not retained. /// -/// The type of the deserialized message payload -public class ReceivedMessage : ReceivedMessage +/// The deserialized payload type. +public sealed class ReceivedMessage { /// - /// The deserialized message payload. Will be null if deserialization failed. + /// Message header. + /// + public required MessageHeader Header { get; init; } + + /// + /// The deserialized payload. Null if is not . /// public T? Data { get; init; } + + /// + /// Parsed user headers, if present. + /// + public Dictionary? UserHeaders { get; init; } + + /// + /// The current offset of this message in the partition. + /// + public required ulong CurrentOffset { get; init; } + + /// + /// The partition ID from which this message was consumed. + /// + public uint PartitionId { get; init; } + + /// + /// The status of the message (Success, DecryptionFailed, DeserializationFailed). + /// + public MessageStatus Status { get; init; } = MessageStatus.Success; + + /// + /// The exception that occurred during processing, if any. + /// + public Exception? Error { get; init; } } /// diff --git a/foreign/csharp/Iggy_SDK/Consumers/ReceivedRentedMessage.cs b/foreign/csharp/Iggy_SDK/Consumers/ReceivedRentedMessage.cs new file mode 100644 index 0000000000..517f43afd1 --- /dev/null +++ b/foreign/csharp/Iggy_SDK/Consumers/ReceivedRentedMessage.cs @@ -0,0 +1,122 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +using Apache.Iggy.Contracts; + +namespace Apache.Iggy.Consumers; + +/// +/// Represents a message received from the Iggy consumer whose payload and raw headers are backed by rented memory +/// shared across a polled batch. The caller SHOULD dispose the message once processing is complete for deterministic +/// release of the underlying pool buffer. If the caller forgets, the buffer is still returned to the pool when the +/// owning becomes unreachable and its finalizer runs - non-deterministic but safe. +/// +public sealed class ReceivedRentedMessage : IDisposable +{ + private int _disposed; + internal RentedBatchHandle? Handle { get; init; } + + /// + /// The underlying rented message response containing headers and rented payload memory. + /// + public required RentedMessageResponse Message { get; init; } + + /// + /// The current offset of this message in the partition. + /// + public required ulong CurrentOffset { get; init; } + + /// + /// The partition ID from which this message was consumed. + /// + public uint PartitionId { get; init; } + + /// + /// The status of the message (Success, DecryptionFailed). + /// + public MessageStatus Status { get; init; } = MessageStatus.Success; + + /// + /// The exception that occurred during processing, if any. + /// + public Exception? Error { get; init; } + + /// + /// Releases this message's reference on the underlying rental. When the final message of a batch is disposed, + /// the rented buffer is returned to the pool and any payload/raw header slices are invalidated. + /// + public void Dispose() + { + if (Interlocked.Exchange(ref _disposed, 1) == 1) + { + return; + } + + Handle?.Release(); + } +} + +/// +/// Reference-counted handle around a . Shared by all +/// instances produced from one poll; the rental is disposed +/// when the reference count drops to zero. +/// +public sealed class RentedBatchHandle : IDisposable +{ + private readonly PolledMessagesRental _rental; + private int _refCount = 1; + + /// + /// Creates a new handle with a single self-reference held by the constructing producer. The producer + /// must call before each publish and release the self-reference (via + /// or ) once it has finished producing. + /// + /// The polled messages rental whose lifetime this handle manages. + public RentedBatchHandle(PolledMessagesRental rental) + { + _rental = rental; + } + + /// + /// Releases one reference on the underlying rental. Equivalent to . + /// + public void Dispose() + { + Release(); + } + + /// + /// Acquires an additional reference. Must be balanced by a matching . + /// + public void Acquire() + { + Interlocked.Increment(ref _refCount); + } + + /// + /// Decrements the reference count. When the count reaches zero, the underlying rental is disposed + /// and its pool buffer returned. + /// + public void Release() + { + var remaining = Interlocked.Decrement(ref _refCount); + if (remaining <= 0) + { + _rental.Dispose(); + } + } +} diff --git a/foreign/csharp/Iggy_SDK/Contracts/MessageResponse.cs b/foreign/csharp/Iggy_SDK/Contracts/MessageResponse.cs index 9a5eb55afd..f29ca79fa8 100644 --- a/foreign/csharp/Iggy_SDK/Contracts/MessageResponse.cs +++ b/foreign/csharp/Iggy_SDK/Contracts/MessageResponse.cs @@ -18,6 +18,7 @@ using System.Text.Json.Serialization; using Apache.Iggy.Headers; using Apache.Iggy.JsonConverters; +using Apache.Iggy.Mappers; using Apache.Iggy.Messages; namespace Apache.Iggy.Contracts; @@ -28,6 +29,10 @@ namespace Apache.Iggy.Contracts; [JsonConverter(typeof(MessageResponseConverter))] public sealed class MessageResponse { + private byte[]? _rawUserHeaders; + private Dictionary? _userHeaders; + private bool _userHeadersInitialized; + /// /// Message header. /// @@ -41,12 +46,51 @@ public sealed class MessageResponse /// /// Headers defined by the user. /// - public Dictionary? UserHeaders { get; set; } + public Dictionary? UserHeaders + { + get + { + if (!_userHeadersInitialized) + { + _userHeaders = _rawUserHeaders is { Length: > 0 } + ? BinaryMapper.TryMapHeaders(_rawUserHeaders) + : null; + _userHeadersInitialized = true; + } + + return _userHeaders; + } + set + { + _userHeaders = value; + _userHeadersInitialized = true; + } + } /// /// Raw user header bytes before deserialization. /// Used internally for decrypting encrypted headers. /// [JsonIgnore] - internal byte[]? RawUserHeaders { get; set; } + internal byte[]? RawUserHeaders + { + get => _rawUserHeaders; + set + { + _rawUserHeaders = value; + _userHeaders = null; + _userHeadersInitialized = false; + } + } + + internal void ParseUserHeaders() + { + if (!_userHeadersInitialized) + { + _userHeaders = _rawUserHeaders is { Length: > 0 } + ? BinaryMapper.TryMapHeaders(_rawUserHeaders) + : null; + _userHeadersInitialized = true; + } + } } diff --git a/foreign/csharp/Iggy_SDK/Contracts/PolledMessagesRental.cs b/foreign/csharp/Iggy_SDK/Contracts/PolledMessagesRental.cs new file mode 100644 index 0000000000..72bd6f6bfc --- /dev/null +++ b/foreign/csharp/Iggy_SDK/Contracts/PolledMessagesRental.cs @@ -0,0 +1,62 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +using System.Buffers; + +namespace Apache.Iggy.Contracts; + +/// +/// Represents a rented poll result whose payload and raw header memory remain valid until disposed. +/// +public sealed class PolledMessagesRental : IDisposable +{ + private readonly IMemoryOwner _owner; + private int _disposed; + + /// + /// Partition identifier for the messages. + /// + public required int PartitionId { get; init; } + + /// + /// Current offset for the partition. + /// + public required ulong CurrentOffset { get; init; } + + /// + /// Rented messages. + /// + public required IReadOnlyList Messages { get; init; } + + internal PolledMessagesRental(IMemoryOwner owner) + { + _owner = owner; + } + + /// + /// Disposes the rental and returns the underlying buffer to the pool. + /// + public void Dispose() + { + if (Interlocked.Exchange(ref _disposed, 1) != 0) + { + return; + } + + _owner.Dispose(); + } +} diff --git a/foreign/csharp/Iggy_SDK/Contracts/RentedMessageResponse.cs b/foreign/csharp/Iggy_SDK/Contracts/RentedMessageResponse.cs new file mode 100644 index 0000000000..f4db2413af --- /dev/null +++ b/foreign/csharp/Iggy_SDK/Contracts/RentedMessageResponse.cs @@ -0,0 +1,71 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +using Apache.Iggy.Headers; +using Apache.Iggy.Mappers; +using Apache.Iggy.Messages; + +namespace Apache.Iggy.Contracts; + +/// +/// Response containing rented payload and raw header memory. +/// The payload and raw headers are only valid while the owning is alive. +/// +public sealed class RentedMessageResponse +{ + private Dictionary? _userHeaders; + private bool _userHeadersInitialized; + + /// + /// Message header. + /// + public required MessageHeader Header { get; init; } + + /// + /// Message payload backed by rented memory. + /// + public required ReadOnlyMemory Payload { get; init; } + + /// + /// Raw user header bytes backed by rented memory. + /// + public ReadOnlyMemory RawUserHeaders { get; init; } + + /// + /// Parsed user headers. Parsed lazily and cached on first access. + /// + public Dictionary? UserHeaders + { + get + { + if (!_userHeadersInitialized) + { + _userHeaders = RawUserHeaders.IsEmpty + ? null + : BinaryMapper.TryMapHeaders(RawUserHeaders.Span); + _userHeadersInitialized = true; + } + + return _userHeaders; + } + init + { + _userHeaders = value; + _userHeadersInitialized = true; + } + } +} diff --git a/foreign/csharp/Iggy_SDK/Extensions/IggyClientExtension.cs b/foreign/csharp/Iggy_SDK/Extensions/IggyClientExtension.cs index 82d1a8139c..ffa5afc16f 100644 --- a/foreign/csharp/Iggy_SDK/Extensions/IggyClientExtension.cs +++ b/foreign/csharp/Iggy_SDK/Extensions/IggyClientExtension.cs @@ -52,10 +52,10 @@ public static IggyConsumerBuilder CreateConsumerBuilder(this IIggyClient client, /// Consumer /// Optional deserializer /// Iggy consumer builder - public static IggyConsumerBuilder CreateConsumerBuilder(this IIggyClient client, Identifier streamId, - Identifier topicId, Consumer consumer, IDeserializer deserializer) where T : IDeserializer + public static IggyConsumerBuilder CreateConsumerBuilder(this IIggyClient client, Identifier streamId, + Identifier topicId, Consumer consumer, IDeserializer deserializer) { - return IggyConsumerBuilder.Create(client, streamId, topicId, consumer); + return IggyConsumerBuilder.Create(client, streamId, topicId, consumer, deserializer); } /// diff --git a/foreign/csharp/Iggy_SDK/IggyClient/IIggyConsumer.cs b/foreign/csharp/Iggy_SDK/IggyClient/IIggyConsumer.cs index 476b09b1b1..418fd5a93f 100644 --- a/foreign/csharp/Iggy_SDK/IggyClient/IIggyConsumer.cs +++ b/foreign/csharp/Iggy_SDK/IggyClient/IIggyConsumer.cs @@ -46,6 +46,18 @@ Task PollMessagesAsync(Identifier streamId, Identifier topicId, Consumer consumer, PollingStrategy pollingStrategy, uint count, bool autoCommit, CancellationToken token = default); + /// + /// Polls messages from a specified topic and partition while renting the payload buffers from a shared pool + /// instead of copying them into byte arrays. + /// + /// + /// The returned rental must be disposed when the caller is done reading the payload and raw header memory. + /// Payload and raw header slices are invalidated once the rental is disposed. + /// + Task PollMessagesRentedAsync(Identifier streamId, Identifier topicId, uint? partitionId, + Consumer consumer, PollingStrategy pollingStrategy, uint count, bool autoCommit, + CancellationToken token = default); + /// /// Polls messages from a specified topic using a pre-constructed request. /// @@ -60,4 +72,14 @@ Task PollMessagesAsync(MessageFetchRequest request, Cancellation return PollMessagesAsync(request.StreamId, request.TopicId, request.PartitionId, request.Consumer, request.PollingStrategy, request.Count, request.AutoCommit, token); } + + /// + /// Polls messages from a specified topic using a pre-constructed request while renting the payload buffers + /// from a shared pool. + /// + Task PollMessagesRentedAsync(MessageFetchRequest request, CancellationToken token = default) + { + return PollMessagesRentedAsync(request.StreamId, request.TopicId, request.PartitionId, request.Consumer, + request.PollingStrategy, request.Count, request.AutoCommit, token); + } } diff --git a/foreign/csharp/Iggy_SDK/IggyClient/Implementations/HttpMessageStream.cs b/foreign/csharp/Iggy_SDK/IggyClient/Implementations/HttpMessageStream.cs index fac18e1de0..9b1ce3905a 100644 --- a/foreign/csharp/Iggy_SDK/IggyClient/Implementations/HttpMessageStream.cs +++ b/foreign/csharp/Iggy_SDK/IggyClient/Implementations/HttpMessageStream.cs @@ -28,6 +28,7 @@ using Apache.Iggy.Enums; using Apache.Iggy.Exceptions; using Apache.Iggy.Kinds; +using Apache.Iggy.Mappers; using Apache.Iggy.Messages; using Apache.Iggy.StringHandlers; using Apache.Iggy.Utils; @@ -303,6 +304,17 @@ public async Task PollMessagesAsync(Identifier streamId, Identif return PolledMessages.Empty; } + /// + public async Task PollMessagesRentedAsync(Identifier streamId, Identifier topicId, + uint? partitionId, + Consumer consumer, + PollingStrategy pollingStrategy, uint count, bool autoCommit, CancellationToken token = default) + { + var messages = await PollMessagesAsync(streamId, topicId, partitionId, consumer, pollingStrategy, count, + autoCommit, token); + return BinaryMapper.ToRentedMessages(messages); + } + /// public async Task StoreOffsetAsync(Consumer consumer, Identifier streamId, Identifier topicId, ulong offset, uint? partitionId, CancellationToken token = default) diff --git a/foreign/csharp/Iggy_SDK/IggyClient/Implementations/TcpMessageStream.cs b/foreign/csharp/Iggy_SDK/IggyClient/Implementations/TcpMessageStream.cs index ed8d424451..d91425d947 100644 --- a/foreign/csharp/Iggy_SDK/IggyClient/Implementations/TcpMessageStream.cs +++ b/foreign/csharp/Iggy_SDK/IggyClient/Implementations/TcpMessageStream.cs @@ -46,6 +46,7 @@ public sealed class TcpMessageStream : IIggyClient private readonly EventAggregator _connectionEvents; private readonly SemaphoreSlim _connectionSemaphore; private readonly ILogger _logger; + private readonly byte[] _responseHeaderBuffer = new byte[BufferSizes.EXPECTED_RESPONSE_SIZE]; private readonly SemaphoreSlim _sendingSemaphore; private string _currentAddress = string.Empty; private X509Certificate2Collection _customCaStore = []; @@ -102,14 +103,14 @@ public string GetCurrentAddress() var payload = new byte[4 + BufferSizes.INITIAL_BYTES_LENGTH + message.Length]; TcpMessageStreamHelpers.CreatePayload(payload, message, CommandCodes.CREATE_STREAM_CODE); - var responseBuffer = await SendWithResponseAsync(payload, token); + using IMemoryOwner responseBuffer = await SendWithResponseAsync(payload, token); - if (responseBuffer.Length == 0) + if (responseBuffer.Memory.Length == 0) { throw new InvalidResponseException("Received empty response while trying to create stream."); } - return BinaryMapper.MapStream(responseBuffer); + return BinaryMapper.MapStream(responseBuffer.Memory.Span); } /// @@ -119,14 +120,14 @@ public string GetCurrentAddress() var payload = new byte[4 + BufferSizes.INITIAL_BYTES_LENGTH + message.Length]; TcpMessageStreamHelpers.CreatePayload(payload, message, CommandCodes.GET_STREAM_CODE); - var responseBuffer = await SendWithResponseAsync(payload, token); + using IMemoryOwner responseBuffer = await SendWithResponseAsync(payload, token); - if (responseBuffer.Length == 0) + if (responseBuffer.Memory.Length == 0) { return null; } - return BinaryMapper.MapStream(responseBuffer); + return BinaryMapper.MapStream(responseBuffer.Memory.Span); } /// @@ -136,14 +137,14 @@ public async Task> GetStreamsAsync(CancellationTok var payload = new byte[4 + BufferSizes.INITIAL_BYTES_LENGTH + message.Length]; TcpMessageStreamHelpers.CreatePayload(payload, message, CommandCodes.GET_STREAMS_CODE); - var responseBuffer = await SendWithResponseAsync(payload, token); + using IMemoryOwner responseBuffer = await SendWithResponseAsync(payload, token); - if (responseBuffer.Length == 0) + if (responseBuffer.Memory.Length == 0) { return []; } - return BinaryMapper.MapStreams(responseBuffer); + return BinaryMapper.MapStreams(responseBuffer.Memory.Span); } /// @@ -153,7 +154,7 @@ public async Task UpdateStreamAsync(Identifier streamId, string name, Cancellati var payload = new byte[4 + BufferSizes.INITIAL_BYTES_LENGTH + message.Length]; TcpMessageStreamHelpers.CreatePayload(payload, message, CommandCodes.UPDATE_STREAM_CODE); - await SendWithResponseAsync(payload, token); + await SendAckAsync(payload, token); } /// @@ -163,7 +164,7 @@ public async Task PurgeStreamAsync(Identifier streamId, CancellationToken token var payload = new byte[4 + BufferSizes.INITIAL_BYTES_LENGTH + message.Length]; TcpMessageStreamHelpers.CreatePayload(payload, message, CommandCodes.PURGE_STREAM_CODE); - await SendWithResponseAsync(payload, token); + await SendAckAsync(payload, token); } /// @@ -173,7 +174,7 @@ public async Task DeleteStreamAsync(Identifier streamId, CancellationToken token var payload = new byte[4 + BufferSizes.INITIAL_BYTES_LENGTH + message.Length]; TcpMessageStreamHelpers.CreatePayload(payload, message, CommandCodes.DELETE_STREAM_CODE); - await SendWithResponseAsync(payload, token); + await SendAckAsync(payload, token); } /// @@ -184,14 +185,14 @@ public async Task> GetTopicsAsync(Identifier stream var payload = new byte[4 + BufferSizes.INITIAL_BYTES_LENGTH + message.Length]; TcpMessageStreamHelpers.CreatePayload(payload, message, CommandCodes.GET_TOPICS_CODE); - var responseBuffer = await SendWithResponseAsync(payload, token); + using IMemoryOwner responseBuffer = await SendWithResponseAsync(payload, token); - if (responseBuffer.Length == 0) + if (responseBuffer.Memory.Length == 0) { return []; } - return BinaryMapper.MapTopics(responseBuffer); + return BinaryMapper.MapTopics(responseBuffer.Memory.Span); } /// @@ -202,14 +203,14 @@ public async Task> GetTopicsAsync(Identifier stream var payload = new byte[4 + BufferSizes.INITIAL_BYTES_LENGTH + message.Length]; TcpMessageStreamHelpers.CreatePayload(payload, message, CommandCodes.GET_TOPIC_CODE); - var responseBuffer = await SendWithResponseAsync(payload, token); + using IMemoryOwner responseBuffer = await SendWithResponseAsync(payload, token); - if (responseBuffer.Length == 0) + if (responseBuffer.Memory.Length == 0) { return null; } - return BinaryMapper.MapTopic(responseBuffer); + return BinaryMapper.MapTopic(responseBuffer.Memory.Span); } /// @@ -223,14 +224,14 @@ public async Task> GetTopicsAsync(Identifier stream var payload = new byte[4 + BufferSizes.INITIAL_BYTES_LENGTH + message.Length]; TcpMessageStreamHelpers.CreatePayload(payload, message, CommandCodes.CREATE_TOPIC_CODE); - var responseBuffer = await SendWithResponseAsync(payload, token); + using IMemoryOwner responseBuffer = await SendWithResponseAsync(payload, token); - if (responseBuffer.Length == 0) + if (responseBuffer.Memory.Length == 0) { return null; } - return BinaryMapper.MapTopic(responseBuffer); + return BinaryMapper.MapTopic(responseBuffer.Memory.Span); } /// @@ -245,7 +246,7 @@ public async Task UpdateTopicAsync(Identifier streamId, Identifier topicId, stri var payload = new byte[4 + BufferSizes.INITIAL_BYTES_LENGTH + message.Length]; TcpMessageStreamHelpers.CreatePayload(payload, message, CommandCodes.UPDATE_TOPIC_CODE); - await SendWithResponseAsync(payload, token); + await SendAckAsync(payload, token); } /// @@ -255,7 +256,7 @@ public async Task DeleteTopicAsync(Identifier streamId, Identifier topicId, Canc var payload = new byte[4 + BufferSizes.INITIAL_BYTES_LENGTH + message.Length]; TcpMessageStreamHelpers.CreatePayload(payload, message, CommandCodes.DELETE_TOPIC_CODE); - await SendWithResponseAsync(payload, token); + await SendAckAsync(payload, token); } /// @@ -265,7 +266,7 @@ public async Task PurgeTopicAsync(Identifier streamId, Identifier topicId, Cance var payload = new byte[4 + BufferSizes.INITIAL_BYTES_LENGTH + message.Length]; TcpMessageStreamHelpers.CreatePayload(payload, message, CommandCodes.PURGE_TOPIC_CODE); - await SendWithResponseAsync(payload, token); + await SendAckAsync(payload, token); } @@ -289,7 +290,7 @@ public async Task SendMessagesAsync(Identifier streamId, Identifier topicId, Par TcpMessageStreamHelpers.CreatePayload(payloadBuffer.Memory.Span[..payloadBufferSize], messageBuffer.Memory.Span[..messageBufferSize], CommandCodes.SEND_MESSAGES_CODE); - await SendWithResponseAsync(payloadBuffer.Memory[..payloadBufferSize].ToArray(), token); + await SendAckAsync(payloadBuffer.Memory[..payloadBufferSize], token); } finally { @@ -307,27 +308,49 @@ public async Task FlushUnsavedBufferAsync(Identifier streamId, Identifier topicI var payload = new byte[4 + BufferSizes.INITIAL_BYTES_LENGTH + message.Length]; TcpMessageStreamHelpers.CreatePayload(payload, message, CommandCodes.FLUSH_UNSAVED_BUFFER_CODE); - await SendWithResponseAsync(payload, token); + await SendAckAsync(payload, token); } /// public async Task PollMessagesAsync(Identifier streamId, Identifier topicId, uint? partitionId, Consumer consumer, PollingStrategy pollingStrategy, uint count, bool autoCommit, CancellationToken token = default) + { + using var rental = await PollMessagesRentedAsync(streamId, topicId, partitionId, consumer, pollingStrategy, + count, autoCommit, token); + return BinaryMapper.MaterializeMessages(rental); + } + + /// + public async Task PollMessagesRentedAsync(Identifier streamId, Identifier topicId, + uint? partitionId, + Consumer consumer, + PollingStrategy pollingStrategy, uint count, bool autoCommit, CancellationToken token = default) { var messageBufferSize = CalculateMessageBufferSize(streamId, topicId, consumer); var payloadBufferSize = CalculatePayloadBufferSize(messageBufferSize); - var message = new byte[messageBufferSize]; - var payload = new byte[payloadBufferSize]; - - TcpContracts.GetMessages(message.AsSpan()[..messageBufferSize], consumer, streamId, - topicId, pollingStrategy, count, autoCommit, partitionId); - TcpMessageStreamHelpers.CreatePayload(payload, message.AsSpan()[..messageBufferSize], - CommandCodes.POLL_MESSAGES_CODE); + var payload = ArrayPool.Shared.Rent(payloadBufferSize); + IMemoryOwner? responseBuffer = null; - var responseBuffer = await SendWithResponseAsync(payload, token); + try + { + TcpContracts.GetMessages(payload.AsSpan().Slice(8, messageBufferSize), consumer, streamId, + topicId, pollingStrategy, count, autoCommit, partitionId); + BinaryPrimitives.WriteInt32LittleEndian(payload.AsSpan()[..4], messageBufferSize + 4); + BinaryPrimitives.WriteInt32LittleEndian(payload.AsSpan()[4..8], CommandCodes.POLL_MESSAGES_CODE); - return BinaryMapper.MapMessages(responseBuffer); + responseBuffer = await SendWithResponseAsync(payload.AsMemory(0, payloadBufferSize), token); + return BinaryMapper.MapRentedMessages(responseBuffer.Memory, responseBuffer); + } + catch + { + responseBuffer?.Dispose(); + throw; + } + finally + { + ArrayPool.Shared.Return(payload); + } } /// @@ -338,7 +361,7 @@ public async Task StoreOffsetAsync(Consumer consumer, Identifier streamId, Ident var payload = new byte[4 + BufferSizes.INITIAL_BYTES_LENGTH + message.Length]; TcpMessageStreamHelpers.CreatePayload(payload, message, CommandCodes.STORE_CONSUMER_OFFSET_CODE); - await SendWithResponseAsync(payload, token); + await SendAckAsync(payload, token); } /// @@ -349,14 +372,14 @@ public async Task StoreOffsetAsync(Consumer consumer, Identifier streamId, Ident var payload = new byte[4 + BufferSizes.INITIAL_BYTES_LENGTH + message.Length]; TcpMessageStreamHelpers.CreatePayload(payload, message, CommandCodes.GET_CONSUMER_OFFSET_CODE); - var responseBuffer = await SendWithResponseAsync(payload, token); + using IMemoryOwner responseBuffer = await SendWithResponseAsync(payload, token); - if (responseBuffer.Length == 0) + if (responseBuffer.Memory.Length == 0) { return null; } - return BinaryMapper.MapOffsets(responseBuffer); + return BinaryMapper.MapOffsets(responseBuffer.Memory.Span); } /// @@ -367,7 +390,7 @@ public async Task DeleteOffsetAsync(Consumer consumer, Identifier streamId, Iden var payload = new byte[4 + BufferSizes.INITIAL_BYTES_LENGTH + message.Length]; TcpMessageStreamHelpers.CreatePayload(payload, message, CommandCodes.DELETE_CONSUMER_OFFSET_CODE); - await SendWithResponseAsync(payload, token); + await SendAckAsync(payload, token); } /// @@ -379,14 +402,14 @@ public async Task> GetConsumerGroupsAsync(I var payload = new byte[4 + BufferSizes.INITIAL_BYTES_LENGTH + message.Length]; TcpMessageStreamHelpers.CreatePayload(payload, message, CommandCodes.GET_CONSUMER_GROUPS_CODE); - var responseBuffer = await SendWithResponseAsync(payload, token); + using IMemoryOwner responseBuffer = await SendWithResponseAsync(payload, token); - if (responseBuffer.Length == 0) + if (responseBuffer.Memory.Length == 0) { return []; } - return BinaryMapper.MapConsumerGroups(responseBuffer); + return BinaryMapper.MapConsumerGroups(responseBuffer.Memory.Span); } /// @@ -397,14 +420,14 @@ public async Task> GetConsumerGroupsAsync(I var payload = new byte[4 + BufferSizes.INITIAL_BYTES_LENGTH + message.Length]; TcpMessageStreamHelpers.CreatePayload(payload, message, CommandCodes.GET_CONSUMER_GROUP_CODE); - var responseBuffer = await SendWithResponseAsync(payload, token); + using IMemoryOwner responseBuffer = await SendWithResponseAsync(payload, token); - if (responseBuffer.Length == 0) + if (responseBuffer.Memory.Length == 0) { return null; } - return BinaryMapper.MapConsumerGroup(responseBuffer); + return BinaryMapper.MapConsumerGroup(responseBuffer.Memory.Span); } /// @@ -415,14 +438,14 @@ public async Task> GetConsumerGroupsAsync(I var payload = new byte[4 + BufferSizes.INITIAL_BYTES_LENGTH + message.Length]; TcpMessageStreamHelpers.CreatePayload(payload, message, CommandCodes.CREATE_CONSUMER_GROUP_CODE); - var responseBuffer = await SendWithResponseAsync(payload, token); + using IMemoryOwner responseBuffer = await SendWithResponseAsync(payload, token); - if (responseBuffer.Length == 0) + if (responseBuffer.Memory.Length == 0) { return null; } - return BinaryMapper.MapConsumerGroup(responseBuffer); + return BinaryMapper.MapConsumerGroup(responseBuffer.Memory.Span); } /// @@ -433,7 +456,7 @@ public async Task DeleteConsumerGroupAsync(Identifier streamId, Identifier topic var payload = new byte[4 + BufferSizes.INITIAL_BYTES_LENGTH + message.Length]; TcpMessageStreamHelpers.CreatePayload(payload, message, CommandCodes.DELETE_CONSUMER_GROUP_CODE); - await SendWithResponseAsync(payload, token); + await SendAckAsync(payload, token); } /// @@ -444,7 +467,7 @@ public async Task JoinConsumerGroupAsync(Identifier streamId, Identifier topicId var payload = new byte[4 + BufferSizes.INITIAL_BYTES_LENGTH + message.Length]; TcpMessageStreamHelpers.CreatePayload(payload, message, CommandCodes.JOIN_CONSUMER_GROUP_CODE); - await SendWithResponseAsync(payload, token); + await SendAckAsync(payload, token); } /// @@ -455,7 +478,7 @@ public async Task LeaveConsumerGroupAsync(Identifier streamId, Identifier topicI var payload = new byte[4 + BufferSizes.INITIAL_BYTES_LENGTH + message.Length]; TcpMessageStreamHelpers.CreatePayload(payload, message, CommandCodes.LEAVE_CONSUMER_GROUP_CODE); - await SendWithResponseAsync(payload, token); + await SendAckAsync(payload, token); } /// @@ -466,7 +489,7 @@ public async Task DeletePartitionsAsync(Identifier streamId, Identifier topicId, var payload = new byte[4 + BufferSizes.INITIAL_BYTES_LENGTH + message.Length]; TcpMessageStreamHelpers.CreatePayload(payload, message, CommandCodes.DELETE_PARTITIONS_CODE); - await SendWithResponseAsync(payload, token); + await SendAckAsync(payload, token); } /// @@ -477,7 +500,7 @@ public async Task CreatePartitionsAsync(Identifier streamId, Identifier topicId, var payload = new byte[4 + BufferSizes.INITIAL_BYTES_LENGTH + message.Length]; TcpMessageStreamHelpers.CreatePayload(payload, message, CommandCodes.CREATE_PARTITIONS_CODE); - await SendWithResponseAsync(payload, token); + await SendAckAsync(payload, token); } /// @@ -488,7 +511,7 @@ public async Task DeleteSegmentsAsync(Identifier streamId, Identifier topicId, u var payload = new byte[4 + BufferSizes.INITIAL_BYTES_LENGTH + message.Length]; TcpMessageStreamHelpers.CreatePayload(payload, message, CommandCodes.DELETE_SEGMENTS_CODE); - await SendWithResponseAsync(payload, token); + await SendAckAsync(payload, token); } /// @@ -498,14 +521,14 @@ public async Task DeleteSegmentsAsync(Identifier streamId, Identifier topicId, u var payload = new byte[4 + BufferSizes.INITIAL_BYTES_LENGTH + message.Length]; TcpMessageStreamHelpers.CreatePayload(payload, message, CommandCodes.GET_ME_CODE); - var responseBuffer = await SendWithResponseAsync(payload, token); + using IMemoryOwner responseBuffer = await SendWithResponseAsync(payload, token); - if (responseBuffer.Length == 0) + if (responseBuffer.Memory.Length == 0) { return null; } - return BinaryMapper.MapClient(responseBuffer); + return BinaryMapper.MapClient(responseBuffer.Memory.Span); } /// @@ -515,14 +538,14 @@ public async Task DeleteSegmentsAsync(Identifier streamId, Identifier topicId, u var payload = new byte[4 + BufferSizes.INITIAL_BYTES_LENGTH + message.Length]; TcpMessageStreamHelpers.CreatePayload(payload, message, CommandCodes.GET_STATS_CODE); - var responseBuffer = await SendWithResponseAsync(payload, token); + using IMemoryOwner responseBuffer = await SendWithResponseAsync(payload, token); - if (responseBuffer.Length == 0) + if (responseBuffer.Memory.Length == 0) { return null; } - return BinaryMapper.MapStats(responseBuffer); + return BinaryMapper.MapStats(responseBuffer.Memory.Span); } /// @@ -532,14 +555,14 @@ public async Task DeleteSegmentsAsync(Identifier streamId, Identifier topicId, u var payload = new byte[4 + BufferSizes.INITIAL_BYTES_LENGTH + message.Length]; TcpMessageStreamHelpers.CreatePayload(payload, message, CommandCodes.GET_CLUSTER_METADATA_CODE); - var responseBuffer = await SendWithResponseAsync(payload, token); + using IMemoryOwner responseBuffer = await SendWithResponseAsync(payload, token); - if (responseBuffer.Length == 0) + if (responseBuffer.Memory.Length == 0) { return null; } - return BinaryMapper.MapClusterMetadata(responseBuffer); + return BinaryMapper.MapClusterMetadata(responseBuffer.Memory.Span); } /// @@ -549,7 +572,7 @@ public async Task PingAsync(CancellationToken token = default) var payload = new byte[4 + BufferSizes.INITIAL_BYTES_LENGTH + message.Length]; TcpMessageStreamHelpers.CreatePayload(payload, message, CommandCodes.PING_CODE); - await SendWithResponseAsync(payload, token); + await SendAckAsync(payload, token); } /// @@ -560,7 +583,9 @@ public async Task GetSnapshotAsync(SnapshotCompression compression, var payload = new byte[4 + BufferSizes.INITIAL_BYTES_LENGTH + message.Length]; TcpMessageStreamHelpers.CreatePayload(payload, message, CommandCodes.GET_SNAPSHOT_CODE); - return await SendWithResponseAsync(payload, token); + using IMemoryOwner result = await SendWithResponseAsync(payload, token); + + return result.Memory.Span.ToArray(); } /// @@ -598,14 +623,14 @@ public async Task> GetClientsAsync(CancellationTok var payload = new byte[4 + BufferSizes.INITIAL_BYTES_LENGTH + message.Length]; TcpMessageStreamHelpers.CreatePayload(payload, message, CommandCodes.GET_CLIENTS_CODE); - var responseBuffer = await SendWithResponseAsync(payload, token); + using IMemoryOwner responseBuffer = await SendWithResponseAsync(payload, token); - if (responseBuffer.Length == 0) + if (responseBuffer.Memory.Length == 0) { return []; } - return BinaryMapper.MapClients(responseBuffer); + return BinaryMapper.MapClients(responseBuffer.Memory.Span); } /// @@ -615,14 +640,14 @@ public async Task> GetClientsAsync(CancellationTok var payload = new byte[4 + BufferSizes.INITIAL_BYTES_LENGTH + message.Length]; TcpMessageStreamHelpers.CreatePayload(payload, message, CommandCodes.GET_CLIENT_CODE); - var responseBuffer = await SendWithResponseAsync(payload, token); + using IMemoryOwner responseBuffer = await SendWithResponseAsync(payload, token); - if (responseBuffer.Length == 0) + if (responseBuffer.Memory.Length == 0) { return null; } - return BinaryMapper.MapClient(responseBuffer); + return BinaryMapper.MapClient(responseBuffer.Memory.Span); } /// @@ -632,14 +657,14 @@ public async Task> GetClientsAsync(CancellationTok var payload = new byte[4 + BufferSizes.INITIAL_BYTES_LENGTH + message.Length]; TcpMessageStreamHelpers.CreatePayload(payload, message, CommandCodes.GET_USER_CODE); - var responseBuffer = await SendWithResponseAsync(payload, token); + using IMemoryOwner responseBuffer = await SendWithResponseAsync(payload, token); - if (responseBuffer.Length == 0) + if (responseBuffer.Memory.Length == 0) { return null; } - return BinaryMapper.MapUser(responseBuffer); + return BinaryMapper.MapUser(responseBuffer.Memory.Span); } /// @@ -649,14 +674,14 @@ public async Task> GetUsersAsync(CancellationToken t var payload = new byte[4 + BufferSizes.INITIAL_BYTES_LENGTH + message.Length]; TcpMessageStreamHelpers.CreatePayload(payload, message, CommandCodes.GET_USERS_CODE); - var responseBuffer = await SendWithResponseAsync(payload, token); + using IMemoryOwner responseBuffer = await SendWithResponseAsync(payload, token); - if (responseBuffer.Length == 0) + if (responseBuffer.Memory.Length == 0) { return []; } - return BinaryMapper.MapUsers(responseBuffer); + return BinaryMapper.MapUsers(responseBuffer.Memory.Span); } /// @@ -667,14 +692,14 @@ public async Task> GetUsersAsync(CancellationToken t var payload = new byte[4 + BufferSizes.INITIAL_BYTES_LENGTH + message.Length]; TcpMessageStreamHelpers.CreatePayload(payload, message, CommandCodes.CREATE_USER_CODE); - var responseBuffer = await SendWithResponseAsync(payload, token); + using IMemoryOwner responseBuffer = await SendWithResponseAsync(payload, token); - if (responseBuffer.Length == 0) + if (responseBuffer.Memory.Length == 0) { return null; } - return BinaryMapper.MapUser(responseBuffer); + return BinaryMapper.MapUser(responseBuffer.Memory.Span); } /// @@ -684,7 +709,7 @@ public async Task DeleteUserAsync(Identifier userId, CancellationToken token = d var payload = new byte[4 + BufferSizes.INITIAL_BYTES_LENGTH + message.Length]; TcpMessageStreamHelpers.CreatePayload(payload, message, CommandCodes.DELETE_USER_CODE); - await SendWithResponseAsync(payload, token); + await SendAckAsync(payload, token); } /// @@ -695,7 +720,7 @@ public async Task UpdateUserAsync(Identifier userId, string? userName = null, Us var payload = new byte[4 + BufferSizes.INITIAL_BYTES_LENGTH + message.Length]; TcpMessageStreamHelpers.CreatePayload(payload, message, CommandCodes.UPDATE_USER_CODE); - await SendWithResponseAsync(payload, token); + await SendAckAsync(payload, token); } /// @@ -706,7 +731,7 @@ public async Task UpdatePermissionsAsync(Identifier userId, Permissions? permiss var payload = new byte[4 + BufferSizes.INITIAL_BYTES_LENGTH + message.Length]; TcpMessageStreamHelpers.CreatePayload(payload, message, CommandCodes.UPDATE_PERMISSIONS_CODE); - await SendWithResponseAsync(payload, token); + await SendAckAsync(payload, token); } /// @@ -717,7 +742,7 @@ public async Task ChangePasswordAsync(Identifier userId, string currentPassword, var payload = new byte[4 + BufferSizes.INITIAL_BYTES_LENGTH + message.Length]; TcpMessageStreamHelpers.CreatePayload(payload, message, CommandCodes.CHANGE_PASSWORD_CODE); - await SendWithResponseAsync(payload, token); + await SendAckAsync(payload, token); } /// @@ -734,14 +759,14 @@ public async Task ChangePasswordAsync(Identifier userId, string currentPassword, TcpMessageStreamHelpers.CreatePayload(payload, message, CommandCodes.LOGIN_USER_CODE); SetConnectionStateAsync(ConnectionState.Authenticating); - var responseBuffer = await SendWithResponseAsync(payload, token); + using IMemoryOwner responseBuffer = await SendWithResponseAsync(payload, token); - if (responseBuffer.Length <= 0) + if (responseBuffer.Memory.Length == 0) { return null; } - var userId = BinaryPrimitives.ReadInt32LittleEndian(responseBuffer.AsSpan()[..responseBuffer.Length]); + var userId = BinaryPrimitives.ReadInt32LittleEndian(responseBuffer.Memory.Span[..responseBuffer.Memory.Length]); SetConnectionStateAsync(ConnectionState.Authenticated); if (await RedirectAsync(token)) @@ -761,7 +786,7 @@ public async Task LogoutUserAsync(CancellationToken token = default) var payload = new byte[4 + BufferSizes.INITIAL_BYTES_LENGTH + message.Length]; TcpMessageStreamHelpers.CreatePayload(payload, message, CommandCodes.LOGOUT_USER_CODE); - await SendWithResponseAsync(payload, token); + await SendAckAsync(payload, token); } /// @@ -772,14 +797,14 @@ public async Task> GetPersonalAccessT var payload = new byte[4 + BufferSizes.INITIAL_BYTES_LENGTH + message.Length]; TcpMessageStreamHelpers.CreatePayload(payload, message, CommandCodes.GET_PERSONAL_ACCESS_TOKENS_CODE); - var responseBuffer = await SendWithResponseAsync(payload, token); + using IMemoryOwner responseBuffer = await SendWithResponseAsync(payload, token); - if (responseBuffer.Length == 0) + if (responseBuffer.Memory.Length == 0) { return []; } - return BinaryMapper.MapPersonalAccessTokens(responseBuffer); + return BinaryMapper.MapPersonalAccessTokens(responseBuffer.Memory.Span); } /// @@ -790,14 +815,14 @@ public async Task> GetPersonalAccessT var payload = new byte[4 + BufferSizes.INITIAL_BYTES_LENGTH + message.Length]; TcpMessageStreamHelpers.CreatePayload(payload, message, CommandCodes.CREATE_PERSONAL_ACCESS_TOKEN_CODE); - var responseBuffer = await SendWithResponseAsync(payload, token); + using IMemoryOwner responseBuffer = await SendWithResponseAsync(payload, token); - if (responseBuffer.Length == 0) + if (responseBuffer.Memory.Length == 0) { return null; } - return BinaryMapper.MapRawPersonalAccessToken(responseBuffer); + return BinaryMapper.MapRawPersonalAccessToken(responseBuffer.Memory.Span); } /// @@ -807,7 +832,7 @@ public async Task DeletePersonalAccessTokenAsync(string name, CancellationToken var payload = new byte[4 + BufferSizes.INITIAL_BYTES_LENGTH + message.Length]; TcpMessageStreamHelpers.CreatePayload(payload, message, CommandCodes.DELETE_PERSONAL_ACCESS_TOKEN_CODE); - await SendWithResponseAsync(payload, token); + await SendAckAsync(payload, token); } /// @@ -818,14 +843,14 @@ public async Task DeletePersonalAccessTokenAsync(string name, CancellationToken TcpMessageStreamHelpers.CreatePayload(payload, message, CommandCodes.LOGIN_WITH_PERSONAL_ACCESS_TOKEN_CODE); SetConnectionStateAsync(ConnectionState.Authenticating); - var responseBuffer = await SendWithResponseAsync(payload, ct); + using IMemoryOwner responseBuffer = await SendWithResponseAsync(payload, ct); - if (responseBuffer.Length <= 1) + if (responseBuffer.Memory.Length == 0) { return null; } - var userId = BinaryPrimitives.ReadInt32LittleEndian(responseBuffer.AsSpan()[..4]); + var userId = BinaryPrimitives.ReadInt32LittleEndian(responseBuffer.Memory.Span[..4]); SetConnectionStateAsync(ConnectionState.Authenticated); @@ -968,7 +993,13 @@ private async Task CreateSslStreamAndAuthenticate(Socket so return new TcpConnectionStream(sslStream); } - private async Task SendWithResponseAsync(byte[] payload, CancellationToken token = default) + private async Task SendAckAsync(ReadOnlyMemory payload, CancellationToken token = default) + { + using IMemoryOwner _ = await SendWithResponseAsync(payload, token); + } + + private async Task> SendWithResponseAsync(ReadOnlyMemory payload, + CancellationToken token = default) { try { @@ -988,7 +1019,8 @@ private async Task SendWithResponseAsync(byte[] payload, CancellationTok } } - private async Task HandleReconnectionAsync(byte[] payload, CancellationToken token) + private async Task> HandleReconnectionAsync(ReadOnlyMemory payload, + CancellationToken token) { var currentTime = DateTimeOffset.UtcNow; await _connectionSemaphore.WaitAsync(token); @@ -1018,27 +1050,27 @@ private async Task HandleReconnectionAsync(byte[] payload, CancellationT } } - private async Task SendRawAsync(byte[] payload, CancellationToken token) + private async Task> SendRawAsync(ReadOnlyMemory payload, CancellationToken token) { if (_state is ConnectionState.Disconnected or ConnectionState.Connecting) { throw new NotConnectedException(); } + await _sendingSemaphore.WaitAsync(token); + try { - await _sendingSemaphore.WaitAsync(token); await _stream.SendAsync(payload, token); await _stream.FlushAsync(token); // Read the 8-byte header (4 bytes status + 4 bytes length) - var buffer = new byte[BufferSizes.EXPECTED_RESPONSE_SIZE]; var totalRead = 0; while (totalRead < BufferSizes.EXPECTED_RESPONSE_SIZE) { var readBytes = await _stream.ReadAsync( - buffer.AsMemory(totalRead, BufferSizes.EXPECTED_RESPONSE_SIZE - totalRead), + _responseHeaderBuffer.AsMemory(totalRead, BufferSizes.EXPECTED_RESPONSE_SIZE - totalRead), token); if (readBytes == 0) { @@ -1048,7 +1080,7 @@ var readBytes totalRead += readBytes; } - var response = TcpMessageStreamHelpers.GetResponseLengthAndStatus(buffer); + var response = TcpMessageStreamHelpers.GetResponseLengthAndStatus(_responseHeaderBuffer); if (response.Status != 0) { @@ -1058,12 +1090,13 @@ var readBytes $"Invalid response status code: {response.Status}"); } - var errorBuffer = new byte[response.Length]; + + using var errorBuffer = ArrayPoolHelper.Rent(response.Length); totalRead = 0; while (totalRead < response.Length) { var readBytes - = await _stream.ReadAsync(errorBuffer.AsMemory(totalRead, response.Length - totalRead), token); + = await _stream.ReadAsync(errorBuffer.Memory.Slice(totalRead, response.Length - totalRead), token); if (readBytes == 0) { throw new IggyZeroBytesException(); @@ -1072,26 +1105,36 @@ var readBytes totalRead += readBytes; } - throw new InvalidResponseException(Encoding.UTF8.GetString(errorBuffer)); + throw new InvalidResponseException(Encoding.UTF8.GetString(errorBuffer.Memory.Span)); } if (response.Length == 0) { - return []; + return EmptyMemoryOwner.Instance; } - var responseBuffer = new byte[response.Length]; - totalRead = 0; - while (totalRead < response.Length) + var responseBuffer = ArrayPoolHelper.Rent(response.Length); + try { - var readBytes = await _stream.ReadAsync(responseBuffer.AsMemory(totalRead, response.Length - totalRead), - token); - if (readBytes == 0) + totalRead = 0; + while (totalRead < response.Length) { - throw new IggyZeroBytesException(); - } + var readBytes + = await _stream.ReadAsync(responseBuffer.Memory.Slice(totalRead, response.Length - totalRead), + token); - totalRead += readBytes; + if (readBytes == 0) + { + throw new IggyZeroBytesException(); + } + + totalRead += readBytes; + } + } + catch + { + responseBuffer.Dispose(); + throw; } return responseBuffer; @@ -1236,4 +1279,59 @@ private async Task RedirectAsync(CancellationToken token) SetConnectionStateAsync(ConnectionState.Disconnected); return true; } + + internal sealed class EmptyMemoryOwner : IMemoryOwner + { + public static readonly EmptyMemoryOwner Instance = new(); + + private EmptyMemoryOwner() + { + } + + public Memory Memory => Memory.Empty; + + public void Dispose() + { + } + } +} + +internal static class ArrayPoolHelper +{ + public static SlicedMemoryOwner Rent(int minimumLength) + { + return new SlicedMemoryOwner(minimumLength); + } + + internal sealed class SlicedMemoryOwner(int minimumLength) : IMemoryOwner + { + private readonly byte[] _value = ArrayPool.Shared.Rent(minimumLength); + private int _disposed; + + public Memory Memory => _value.AsMemory()[..minimumLength]; + + private void Dispose(bool suppressFinalize) + { + if (Interlocked.Exchange(ref _disposed, 1) != 0) + { + return; + } + + ArrayPool.Shared.Return(_value); + if (suppressFinalize) + { + GC.SuppressFinalize(this); + } + } + + ~SlicedMemoryOwner() + { + Dispose(false); + } + + public void Dispose() + { + Dispose(true); + } + } } diff --git a/foreign/csharp/Iggy_SDK/JsonConverters/MessageResponseConverter.cs b/foreign/csharp/Iggy_SDK/JsonConverters/MessageResponseConverter.cs index 9cb9821d82..dc41363dcd 100644 --- a/foreign/csharp/Iggy_SDK/JsonConverters/MessageResponseConverter.cs +++ b/foreign/csharp/Iggy_SDK/JsonConverters/MessageResponseConverter.cs @@ -73,8 +73,10 @@ public override MessageResponse Read(ref Utf8JsonReader reader, Type typeToConve } else { - userHeaders = HeadersConverter.Read(ref reader, typeof(Dictionary), options); + userHeaders = HeadersConverter.Read(ref reader, typeof(Dictionary), + options); } + break; default: reader.Skip(); @@ -82,13 +84,19 @@ public override MessageResponse Read(ref Utf8JsonReader reader, Type typeToConve } } - return new MessageResponse + var response = new MessageResponse { Header = header ?? throw new JsonException("Missing 'header' field."), Payload = payload ?? throw new JsonException("Missing 'payload' field."), - UserHeaders = userHeaders, RawUserHeaders = rawUserHeaders }; + + if (rawUserHeaders is null) + { + response.UserHeaders = userHeaders; + } + + return response; } public override void Write(Utf8JsonWriter writer, MessageResponse value, JsonSerializerOptions options) diff --git a/foreign/csharp/Iggy_SDK/Mappers/BinaryMapper.cs b/foreign/csharp/Iggy_SDK/Mappers/BinaryMapper.cs index 890b668022..3f979b13d5 100644 --- a/foreign/csharp/Iggy_SDK/Mappers/BinaryMapper.cs +++ b/foreign/csharp/Iggy_SDK/Mappers/BinaryMapper.cs @@ -23,6 +23,7 @@ using Apache.Iggy.Enums; using Apache.Iggy.Extensions; using Apache.Iggy.Headers; +using Apache.Iggy.IggyClient.Implementations; using Apache.Iggy.Messages; using Apache.Iggy.Utils; @@ -332,85 +333,75 @@ internal static OffsetResponse MapOffsets(ReadOnlySpan payload) }; } - internal static PolledMessages MapMessages(ReadOnlySpan payload) + internal static PolledMessagesRental MapRentedMessages(ReadOnlyMemory payload, + IMemoryOwner payloadOwner) { + ReadOnlySpan span = payload.Span; var length = payload.Length; - var partitionId = BinaryPrimitives.ReadInt32LittleEndian(payload[..4]); - var currentOffset = BinaryPrimitives.ReadUInt64LittleEndian(payload[4..12]); - var messagesCount = BinaryPrimitives.ReadUInt32LittleEndian(payload[12..16]); + var partitionId = BinaryPrimitives.ReadInt32LittleEndian(span[..4]); + var currentOffset = BinaryPrimitives.ReadUInt64LittleEndian(span[4..12]); + var messagesCount = BinaryPrimitives.ReadUInt32LittleEndian(span[12..16]); var position = 16; if (position >= length) { - return PolledMessages.Empty; + return new PolledMessagesRental(payloadOwner) + { + PartitionId = partitionId, + CurrentOffset = currentOffset, + Messages = [] + }; } - List messages = new(); + List messages = new((int)messagesCount); while (position < length) { - var checksum = BinaryPrimitives.ReadUInt64LittleEndian(payload[position..(position + 8)]); - var id = BinaryPrimitives.ReadUInt128LittleEndian(payload[(position + 8)..(position + 24)]); - var offset = BinaryPrimitives.ReadUInt64LittleEndian(payload[(position + 24)..(position + 32)]); - var timestamp = BinaryPrimitives.ReadUInt64LittleEndian(payload[(position + 32)..(position + 40)]); - var originTimestamp = BinaryPrimitives.ReadUInt64LittleEndian(payload[(position + 40)..(position + 48)]); - var headersLength = BinaryPrimitives.ReadInt32LittleEndian(payload[(position + 48)..(position + 52)]); - var payloadLength = BinaryPrimitives.ReadInt32LittleEndian(payload[(position + 52)..(position + 56)]); - var reserved = BinaryPrimitives.ReadUInt64LittleEndian(payload[(position + 56)..(position + 64)]); + var checksum = BinaryPrimitives.ReadUInt64LittleEndian(span[position..(position + 8)]); + var id = BinaryPrimitives.ReadUInt128LittleEndian(span[(position + 8)..(position + 24)]); + var offset = BinaryPrimitives.ReadUInt64LittleEndian(span[(position + 24)..(position + 32)]); + var timestamp = BinaryPrimitives.ReadUInt64LittleEndian(span[(position + 32)..(position + 40)]); + var originTimestamp = BinaryPrimitives.ReadUInt64LittleEndian(span[(position + 40)..(position + 48)]); + var headersLength = BinaryPrimitives.ReadInt32LittleEndian(span[(position + 48)..(position + 52)]); + var payloadLength = BinaryPrimitives.ReadInt32LittleEndian(span[(position + 52)..(position + 56)]); + var reserved = BinaryPrimitives.ReadUInt64LittleEndian(span[(position + 56)..(position + 64)]); var wireHeadersLength = headersLength; - byte[]? rawUserHeaders = null; - Dictionary? headers; - if (headersLength == 0) - { - headers = null; - } - else if (headersLength < 0) + if (headersLength < 0) { throw new ArgumentOutOfRangeException(); } - else - { - rawUserHeaders = payload[(position + 64 + payloadLength)..(position + 64 + payloadLength + headersLength)].ToArray(); - headers = TryMapHeaders(rawUserHeaders); - } var payloadRangeStart = position + 64; var payloadRangeEnd = position + 64 + payloadLength; - if (payloadRangeStart > length || payloadRangeEnd > length) + var headersRangeStart = payloadRangeEnd; + var headersRangeEnd = headersRangeStart + headersLength; + if (payloadRangeStart > length || payloadRangeEnd > length || headersRangeStart > length || + headersRangeEnd > length) { break; } - ReadOnlySpan payloadSlice = payload[payloadRangeStart..payloadRangeEnd]; - var messagePayload = ArrayPool.Shared.Rent(payloadSlice.Length); - var payloadSliceLen = payloadSlice.Length; + ReadOnlyMemory payloadSlice = payload.Slice(payloadRangeStart, payloadLength); + ReadOnlyMemory rawHeaders = headersLength > 0 + ? payload.Slice(headersRangeStart, headersLength) + : ReadOnlyMemory.Empty; - try + messages.Add(new RentedMessageResponse { - payloadSlice.CopyTo(messagePayload.AsSpan()[..payloadSliceLen]); - - messages.Add(new MessageResponse + Header = new MessageHeader { - Header = new MessageHeader - { - Checksum = checksum, - Id = id, - Offset = offset, - OriginTimestamp = originTimestamp, - PayloadLength = payloadLength, - Timestamp = DateTimeOffsetUtils.FromUnixTimeMicroSeconds(timestamp), - UserHeadersLength = headersLength, - Reserved = reserved - }, - UserHeaders = headers, - RawUserHeaders = rawUserHeaders, - Payload = messagePayload[..payloadSliceLen] - }); - } - finally - { - ArrayPool.Shared.Return(messagePayload); - } + Checksum = checksum, + Id = id, + Offset = offset, + OriginTimestamp = originTimestamp, + PayloadLength = payloadLength, + Timestamp = DateTimeOffsetUtils.FromUnixTimeMicroSeconds(timestamp), + UserHeadersLength = headersLength, + Reserved = reserved + }, + RawUserHeaders = rawHeaders, + Payload = payloadSlice + }); position += 64 + payloadLength + wireHeadersLength; if (position + PropertiesSize >= length) @@ -419,11 +410,54 @@ internal static PolledMessages MapMessages(ReadOnlySpan payload) } } - return new PolledMessages + return new PolledMessagesRental(payloadOwner) { PartitionId = partitionId, CurrentOffset = currentOffset, - Messages = messages.AsReadOnly() + Messages = messages + }; + } + + internal static PolledMessages MaterializeMessages(PolledMessagesRental rental) + { + var messages = new List(rental.Messages.Count); + foreach (var message in rental.Messages) + { + messages.Add(new MessageResponse + { + Header = message.Header, + RawUserHeaders = message.RawUserHeaders.IsEmpty ? null : message.RawUserHeaders.ToArray(), + Payload = message.Payload.ToArray() + }); + } + + return new PolledMessages + { + PartitionId = rental.PartitionId, + CurrentOffset = rental.CurrentOffset, + Messages = messages + }; + } + + internal static PolledMessagesRental ToRentedMessages(PolledMessages messages) + { + var rentedMessages = new List(messages.Messages.Count); + foreach (var message in messages.Messages) + { + rentedMessages.Add(new RentedMessageResponse + { + Header = message.Header, + Payload = message.Payload, + RawUserHeaders = message.RawUserHeaders ?? ReadOnlyMemory.Empty, + UserHeaders = message.UserHeaders + }); + } + + return new PolledMessagesRental(TcpMessageStream.EmptyMemoryOwner.Instance) + { + PartitionId = messages.PartitionId, + CurrentOffset = messages.CurrentOffset, + Messages = rentedMessages }; } @@ -480,36 +514,61 @@ internal static Dictionary MapHeaders(ReadOnlySpan while (position < payload.Length) { if (!TryMapHeaderKind(payload[position], out var keyKind)) + { return null; + } + position++; if (position + 4 > payload.Length) + { return null; + } + var keyLength = BinaryPrimitives.ReadInt32LittleEndian(payload[position..(position + 4)]); if (keyLength is <= 0 or > 255) + { return null; + } position += 4; if (position + keyLength > payload.Length) + { return null; + } + var keyValue = payload[position..(position + keyLength)].ToArray(); position += keyLength; if (position >= payload.Length) + { return null; + } + if (!TryMapHeaderKind(payload[position], out var valueKind)) + { return null; + } + position++; if (position + 4 > payload.Length) + { return null; + } + var valueLength = BinaryPrimitives.ReadInt32LittleEndian(payload[position..(position + 4)]); if (valueLength is <= 0 or > 255) + { return null; + } position += 4; if (position + valueLength > payload.Length) + { return null; + } + ReadOnlySpan value = payload[position..(position + valueLength)]; position += valueLength; @@ -550,6 +609,7 @@ private static bool TryMapHeaderKind(byte value, out HeaderKind kind) kind = MapHeaderKind(value); return true; } + kind = default; return false; } diff --git a/foreign/csharp/Iggy_SDK_Tests/ConsumerTests/RentedConsumerTests.cs b/foreign/csharp/Iggy_SDK_Tests/ConsumerTests/RentedConsumerTests.cs new file mode 100644 index 0000000000..93ee877020 --- /dev/null +++ b/foreign/csharp/Iggy_SDK_Tests/ConsumerTests/RentedConsumerTests.cs @@ -0,0 +1,440 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +using System.Buffers; +using System.Text; +using Apache.Iggy.Consumers; +using Apache.Iggy.Contracts; +using Apache.Iggy.Encryption; +using Apache.Iggy.Exceptions; +using Apache.Iggy.IggyClient; +using Apache.Iggy.IggyClient.Implementations; +using Apache.Iggy.Kinds; +using Apache.Iggy.Messages; +using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Logging.Abstractions; +using Moq; + +namespace Apache.Iggy.Tests.ConsumerTests; + +public class RentedConsumerTests +{ + [Fact] + public async Task ReceiveRentedAsync_Should_YieldMessages_WithExpectedPayloads() + { + var owner = new TrackingMemoryOwner(1024); + IReadOnlyList messages = BuildMessages(owner, 3); + var rental = new PolledMessagesRental(owner) + { + PartitionId = 1, + CurrentOffset = 2, + Messages = messages + }; + Mock client = BuildClientMock(new Queue(new[] { rental })); + var consumer = new IggyConsumer(client.Object, BuildConfig(), NullLoggerFactory.Instance); + await consumer.InitAsync(TestContext.Current.CancellationToken); + + var cts = new CancellationTokenSource(TimeSpan.FromSeconds(5)); + var got = new List(); + await using (IAsyncEnumerator e = consumer.ReceiveRentedAsync(cts.Token) + .GetAsyncEnumerator(TestContext.Current.CancellationToken)) + { + for (var i = 0; i < 3; i++) + { + Assert.True(await e.MoveNextAsync()); + got.Add(e.Current); + } + } + + Assert.Equal(3, got.Count); + for (var i = 0; i < 3; i++) + { + Assert.Equal(MessageStatus.Success, got[i].Status); + Assert.Equal($"msg-{i}", Encoding.UTF8.GetString(got[i].Message.Payload.Span)); + Assert.Equal((ulong)i, got[i].CurrentOffset); + Assert.Equal(1u, got[i].PartitionId); + Assert.Null(got[i].Error); + } + + // Buffer still rented — none of the messages have been disposed. + Assert.Equal(0, owner.DisposeCount); + + foreach (var m in got) + { + m.Dispose(); + } + + // After disposing every message of the batch, rental returned to pool exactly once. + Assert.Equal(1, owner.DisposeCount); + await consumer.DisposeAsync(); + } + + [Fact] + public async Task ReceiveRentedAsync_Should_NotReturnBuffer_UntilAllMessagesDisposed() + { + var owner = new TrackingMemoryOwner(1024); + IReadOnlyList messages = BuildMessages(owner, 3); + var rental = new PolledMessagesRental(owner) + { + PartitionId = 1, + CurrentOffset = 2, + Messages = messages + }; + Mock client = BuildClientMock(new Queue(new[] { rental })); + var consumer = new IggyConsumer(client.Object, BuildConfig(), NullLoggerFactory.Instance); + await consumer.InitAsync(TestContext.Current.CancellationToken); + + var cts = new CancellationTokenSource(TimeSpan.FromSeconds(5)); + var got = new List(); + await using (IAsyncEnumerator e = consumer.ReceiveRentedAsync(cts.Token) + .GetAsyncEnumerator(TestContext.Current.CancellationToken)) + { + for (var i = 0; i < 3; i++) + { + Assert.True(await e.MoveNextAsync()); + got.Add(e.Current); + } + } + + // Dispose only the first two: buffer must still be alive. + got[0].Dispose(); + Assert.Equal(0, owner.DisposeCount); + got[1].Dispose(); + Assert.Equal(0, owner.DisposeCount); + + // Final dispose returns the buffer. + got[2].Dispose(); + Assert.Equal(1, owner.DisposeCount); + + // Calling Dispose again is a no-op — refcount must not drop below zero. + got[0].Dispose(); + got[1].Dispose(); + got[2].Dispose(); + Assert.Equal(1, owner.DisposeCount); + + await consumer.DisposeAsync(); + } + + [Fact] + public async Task ReceiveRentedAsync_Should_YieldDecryptionFailed_AndStillReleaseBuffer_WhenEncryptorThrows() + { + var owner = new TrackingMemoryOwner(1024); + IReadOnlyList messages = BuildMessages(owner, 1); + var rental = new PolledMessagesRental(owner) + { + PartitionId = 1, + CurrentOffset = 0, + Messages = messages + }; + Mock client = BuildClientMock(new Queue(new[] { rental })); + var encryptor = new ThrowingEncryptor(); + var config = BuildConfig(); + config.MessageEncryptor = encryptor; + var consumer = new IggyConsumer(client.Object, config, NullLoggerFactory.Instance); + await consumer.InitAsync(TestContext.Current.CancellationToken); + + var cts = new CancellationTokenSource(TimeSpan.FromSeconds(5)); + ReceivedRentedMessage? got = null; + await using (IAsyncEnumerator e = consumer.ReceiveRentedAsync(cts.Token) + .GetAsyncEnumerator(TestContext.Current.CancellationToken)) + { + Assert.True(await e.MoveNextAsync()); + got = e.Current; + } + + Assert.NotNull(got); + Assert.Equal(MessageStatus.DecryptionFailed, got!.Status); + Assert.IsType(got.Error); + + Assert.Equal(0, owner.DisposeCount); + got.Dispose(); + Assert.Equal(1, owner.DisposeCount); + + await consumer.DisposeAsync(); + } + + [Fact] + public async Task ReceiveRentedAsync_Should_Throw_WhenConsumerNotInitialized() + { + Mock client = BuildClientMock(new Queue()); + var consumer = new IggyConsumer(client.Object, BuildConfig(), NullLoggerFactory.Instance); + + await Assert.ThrowsAsync(async () => + { + await foreach (var _ in consumer.ReceiveRentedAsync(TestContext.Current.CancellationToken)) + { + } + }); + + await consumer.DisposeAsync(); + } + + [Fact] + public void RentedBatchHandle_AcquireRelease_Balanced_DisposesOnce() + { + var owner = new TrackingMemoryOwner(16); + var rental = new PolledMessagesRental(owner) + { + PartitionId = 1, + CurrentOffset = 0, + Messages = Array.Empty() + }; + var handle = new RentedBatchHandle(rental); // refCount=1 (self-ref) + + handle.Acquire(); // 2 + handle.Acquire(); // 3 + Assert.Equal(0, owner.DisposeCount); + + handle.Release(); // 2 + handle.Release(); // 1 (self-ref still alive) + Assert.Equal(0, owner.DisposeCount); + + handle.Release(); // 0 -> disposed + Assert.Equal(1, owner.DisposeCount); + } + + [Fact] + public void PolledMessagesRental_Dispose_Idempotent() + { + var owner = new TrackingMemoryOwner(16); + var rental = new PolledMessagesRental(owner) + { + PartitionId = 1, + CurrentOffset = 0, + Messages = Array.Empty() + }; + + rental.Dispose(); + rental.Dispose(); + rental.Dispose(); + + Assert.Equal(1, owner.DisposeCount); + } + + [Fact] + public async Task PolledMessagesRental_ConcurrentDispose_ReturnsBufferOnce() + { + var owner = new TrackingMemoryOwner(16); + var rental = new PolledMessagesRental(owner) + { + PartitionId = 1, + CurrentOffset = 0, + Messages = Array.Empty() + }; + + using var start = new ManualResetEventSlim(false); + Task[] tasks = Enumerable.Range(0, 64).Select(_ => Task.Run(() => + { + start.Wait(); + rental.Dispose(); + })).ToArray(); + + start.Set(); + await Task.WhenAll(tasks); + + Assert.Equal(1, owner.DisposeCount); + } + + [Fact] + public async Task PollRented_MidLoopPublishFailure_DoesNotLeakBuffer() + { + var owner = new TrackingMemoryOwner(1024); + IReadOnlyList messages = BuildMessages(owner, 5); + var rental = new PolledMessagesRental(owner) + { + PartitionId = 1, + CurrentOffset = 4, + Messages = messages + }; + Mock client = BuildClientMock(new Queue(new[] { rental })); + var consumer + = new FailingPublishConsumer(client.Object, BuildConfig(), NullLoggerFactory.Instance) { FailAfter = 2 }; + await consumer.InitAsync(TestContext.Current.CancellationToken); + + using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(5)); + var got = new List(); + IAsyncEnumerator enumerator = consumer.ReceiveRentedAsync(cts.Token) + .GetAsyncEnumerator(TestContext.Current.CancellationToken); + + for (var i = 0; i < 2; i++) + { + Assert.True(await enumerator.MoveNextAsync()); + got.Add(enumerator.Current); + } + + // First 2 publishes succeeded, 3rd injected failure aborted the loop. + // Producer self-ref was released in finally; consumer refs (2) still hold the buffer. + Assert.Equal(2, got.Count); + Assert.Equal(0, owner.DisposeCount); + + foreach (var m in got) + { + m.Dispose(); + } + + // All refs released -> rental disposed exactly once. + Assert.Equal(1, owner.DisposeCount); + + cts.Cancel(); + try + { + await enumerator.DisposeAsync(); + } + catch (OperationCanceledException) + { + } + + await consumer.DisposeAsync(); + } + + internal static IggyConsumerConfig BuildConfig() + { + return new IggyConsumerConfig + { + StreamId = Identifier.Numeric(1), + TopicId = Identifier.Numeric(1), + Consumer = Consumer.New(1), + PollingStrategy = PollingStrategy.Next(), + BatchSize = 10, + PartitionId = 1, + AutoCommitMode = AutoCommitMode.Disabled, + AutoCommit = false, + PollingIntervalMs = 0 + }; + } + + /// + /// Slices payload bytes into the supplied owner's memory and returns a list of + /// instances backed by that single rented buffer. + /// + internal static IReadOnlyList BuildMessages(TrackingMemoryOwner owner, int count) + { + var list = new List(count); + Memory buffer = owner.Memory; + var written = 0; + for (var i = 0; i < count; i++) + { + var bytes = Encoding.UTF8.GetBytes($"msg-{i}"); + bytes.CopyTo(buffer.Slice(written, bytes.Length)); + Memory slice = buffer.Slice(written, bytes.Length); + written += bytes.Length; + + list.Add(new RentedMessageResponse + { + Header = new MessageHeader + { + Offset = (ulong)i, + PayloadLength = bytes.Length + }, + Payload = slice, + RawUserHeaders = ReadOnlyMemory.Empty + }); + } + + return list; + } + + /// + /// Builds a that dequeues rentals on each + /// PollMessagesRentedAsync call. When the queue is empty, returns an + /// empty rental so the consumer can spin without dereferencing null. + /// + internal static Mock BuildClientMock(Queue rentals) + { + var mock = new Mock(MockBehavior.Loose); + mock.Setup(c => c.ConnectAsync(It.IsAny())).Returns(Task.CompletedTask); + mock.Setup(c => c.PollMessagesRentedAsync(It.IsAny(), It.IsAny(), It.IsAny(), + It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny(), + It.IsAny())) + .ReturnsAsync(() => + { + if (rentals.Count > 0) + { + return rentals.Dequeue(); + } + + return new PolledMessagesRental(TcpMessageStream.EmptyMemoryOwner.Instance) + { + PartitionId = 1, + CurrentOffset = 0, + Messages = Array.Empty() + }; + }); + return mock; + } + + /// + /// wrapper that counts how many times + /// has been invoked, so tests can assert that + /// the rental returns to the pool exactly once. + /// + internal sealed class TrackingMemoryOwner : IMemoryOwner + { + private readonly byte[] _buffer; + + public int DisposeCount { get; private set; } + + public TrackingMemoryOwner(int size) + { + _buffer = new byte[size]; + } + + public Memory Memory => _buffer; + + public void Dispose() + { + DisposeCount++; + } + } + + private sealed class ThrowingEncryptor : IMessageEncryptor + { + public byte[] Encrypt(byte[] plainData) + { + throw new NotSupportedException(); + } + + public byte[] Decrypt(byte[] encryptedData) + { + throw new InvalidOperationException("decrypt fail"); + } + } + + private sealed class FailingPublishConsumer : IggyConsumer + { + private int _calls; + + public int FailAfter { get; set; } + + public FailingPublishConsumer(IIggyClient client, IggyConsumerConfig config, + ILoggerFactory loggerFactory) + : base(client, config, loggerFactory) + { + } + + protected override async Task PublishRentedAsync(RentedBatchHandle rental, RentedMessageResponse message, + uint partitionId, MessageStatus status, Exception? error, CancellationToken ct) + { + if (Interlocked.Increment(ref _calls) > FailAfter) + { + throw new InvalidOperationException("inject publish failure"); + } + + await base.PublishRentedAsync(rental, message, partitionId, status, error, ct); + } + } +} diff --git a/foreign/csharp/Iggy_SDK_Tests/ConsumerTests/RentedTypedConsumerTests.cs b/foreign/csharp/Iggy_SDK_Tests/ConsumerTests/RentedTypedConsumerTests.cs new file mode 100644 index 0000000000..6da059b5b6 --- /dev/null +++ b/foreign/csharp/Iggy_SDK_Tests/ConsumerTests/RentedTypedConsumerTests.cs @@ -0,0 +1,183 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +using System.Text; +using Apache.Iggy.Consumers; +using Apache.Iggy.Contracts; +using Apache.Iggy.IggyClient; +using Apache.Iggy.Kinds; +using Microsoft.Extensions.Logging.Abstractions; +using Moq; + +namespace Apache.Iggy.Tests.ConsumerTests; + +public class RentedTypedConsumerTests +{ + [Fact] + public async Task ReceiveDeserializedAsync_Should_YieldDeserialized_AndReturnBuffer() + { + var owner = new RentedConsumerTests.TrackingMemoryOwner(1024); + IReadOnlyList messages = RentedConsumerTests.BuildMessages(owner, 3); + var rental = new PolledMessagesRental(owner) + { + PartitionId = 1, + CurrentOffset = 2, + Messages = messages + }; + Mock client + = RentedConsumerTests.BuildClientMock(new Queue(new[] { rental })); + var deserializer = new StringDeserializer(); + var consumer + = new IggyConsumer(client.Object, BuildTypedConfig(deserializer), NullLoggerFactory.Instance); + await consumer.InitAsync(TestContext.Current.CancellationToken); + + var cts = new CancellationTokenSource(TimeSpan.FromSeconds(5)); + var got = new List>(); + await using (IAsyncEnumerator> e = consumer + .ReceiveDeserializedAsync(cts.Token) + .GetAsyncEnumerator(TestContext.Current.CancellationToken)) + { + for (var i = 0; i < 3; i++) + { + Assert.True(await e.MoveNextAsync()); + got.Add(e.Current); + } + } + + Assert.Equal(3, got.Count); + for (var i = 0; i < 3; i++) + { + Assert.Equal(MessageStatus.Success, got[i].Status); + Assert.Equal($"msg-{i}", got[i].Data); + Assert.Equal((ulong)i, got[i].CurrentOffset); + Assert.Equal(1u, got[i].PartitionId); + Assert.Null(got[i].Error); + } + + // Entire batch deserialized before first yield; rental already returned to pool. + Assert.Equal(1, owner.DisposeCount); + await consumer.DisposeAsync(); + } + + [Fact] + public async Task ReceiveDeserializedAsync_Should_ReleaseEntireBatch_BeforeFirstYield() + { + var owner = new RentedConsumerTests.TrackingMemoryOwner(1024); + IReadOnlyList messages = RentedConsumerTests.BuildMessages(owner, 3); + var rental = new PolledMessagesRental(owner) + { + PartitionId = 1, + CurrentOffset = 2, + Messages = messages + }; + Mock client + = RentedConsumerTests.BuildClientMock(new Queue(new[] { rental })); + var deserializer = new StringDeserializer(); + var consumer + = new IggyConsumer(client.Object, BuildTypedConfig(deserializer), NullLoggerFactory.Instance); + await consumer.InitAsync(TestContext.Current.CancellationToken); + + var cts = new CancellationTokenSource(TimeSpan.FromSeconds(5)); + await using IAsyncEnumerator> e = consumer.ReceiveDeserializedAsync(cts.Token) + .GetAsyncEnumerator(TestContext.Current.CancellationToken); + + Assert.True(await e.MoveNextAsync()); + Assert.Equal(1, owner.DisposeCount); + Assert.Equal("msg-0", e.Current.Data); + + // Remaining messages come from the pre-deserialized list — no further disposal needed. + Assert.True(await e.MoveNextAsync()); + Assert.Equal(1, owner.DisposeCount); + Assert.Equal("msg-1", e.Current.Data); + + Assert.True(await e.MoveNextAsync()); + Assert.Equal(1, owner.DisposeCount); + Assert.Equal("msg-2", e.Current.Data); + + await consumer.DisposeAsync(); + } + + [Fact] + public async Task DeserializerThrows_Should_YieldDeserializationFailed_AndStillReturnBuffer() + { + var owner = new RentedConsumerTests.TrackingMemoryOwner(1024); + IReadOnlyList messages = RentedConsumerTests.BuildMessages(owner, 1); + var rental = new PolledMessagesRental(owner) + { + PartitionId = 1, + CurrentOffset = 0, + Messages = messages + }; + Mock client + = RentedConsumerTests.BuildClientMock(new Queue(new[] { rental })); + var deserializer = new StringDeserializer { ThrowOnNext = true }; + var consumer + = new IggyConsumer(client.Object, BuildTypedConfig(deserializer), NullLoggerFactory.Instance); + await consumer.InitAsync(TestContext.Current.CancellationToken); + + var cts = new CancellationTokenSource(TimeSpan.FromSeconds(5)); + ReceivedMessage? got = null; + await using (IAsyncEnumerator> e = consumer + .ReceiveDeserializedAsync(cts.Token) + .GetAsyncEnumerator(TestContext.Current.CancellationToken)) + { + Assert.True(await e.MoveNextAsync()); + got = e.Current; + } + + Assert.NotNull(got); + Assert.Equal(MessageStatus.DeserializationFailed, got!.Status); + Assert.Null(got.Data); + Assert.IsType(got.Error); + + // Even on deserialization failure, the using-block inside the typed consumer releases the rental. + Assert.Equal(1, owner.DisposeCount); + await consumer.DisposeAsync(); + } + + private static IggyConsumerConfig BuildTypedConfig(IDeserializer deserializer) + { + return new IggyConsumerConfig + { + StreamId = Identifier.Numeric(1), + TopicId = Identifier.Numeric(1), + Consumer = Consumer.New(1), + PollingStrategy = PollingStrategy.Next(), + BatchSize = 10, + PartitionId = 1, + AutoCommitMode = AutoCommitMode.Disabled, + AutoCommit = false, + PollingIntervalMs = 0, + Deserializer = deserializer + }; + } + + private sealed class StringDeserializer : IDeserializer + { + public bool ThrowOnNext { get; set; } + + public string Deserialize(ReadOnlyMemory data) + { + if (ThrowOnNext) + { + throw new InvalidOperationException("deserialize fail"); + } + + return Encoding.UTF8.GetString(data.Span); + } + } +} diff --git a/foreign/csharp/Iggy_SDK_Tests/MapperTests/BinaryMapper.cs b/foreign/csharp/Iggy_SDK_Tests/MapperTests/BinaryMapper.cs index 234f5239b4..580d905f8f 100644 --- a/foreign/csharp/Iggy_SDK_Tests/MapperTests/BinaryMapper.cs +++ b/foreign/csharp/Iggy_SDK_Tests/MapperTests/BinaryMapper.cs @@ -19,6 +19,7 @@ using Apache.Iggy.Contracts.Auth; using Apache.Iggy.Enums; using Apache.Iggy.Extensions; +using Apache.Iggy.IggyClient.Implementations; using Apache.Iggy.Tests.Utils; using Apache.Iggy.Tests.Utils.Groups; using Apache.Iggy.Tests.Utils.Messages; @@ -84,7 +85,8 @@ public void MapMessages_NoHeaders_ReturnsValidMessageResponses() msgTwoPayload.CopyTo(combinedPayload.AsSpan(16 + msgOnePayload.Length)); // Act - var responses = Mappers.BinaryMapper.MapMessages(combinedPayload); + var responses + = Mappers.BinaryMapper.MapRentedMessages(combinedPayload, TcpMessageStream.EmptyMemoryOwner.Instance); // Assert Assert.NotNull(responses); diff --git a/foreign/csharp/Iggy_SDK_Tests/UtilityTests/SlicedMemoryOwnerTests.cs b/foreign/csharp/Iggy_SDK_Tests/UtilityTests/SlicedMemoryOwnerTests.cs new file mode 100644 index 0000000000..73e9ba3b9c --- /dev/null +++ b/foreign/csharp/Iggy_SDK_Tests/UtilityTests/SlicedMemoryOwnerTests.cs @@ -0,0 +1,85 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +using System.Buffers; +using System.Reflection; +using System.Runtime.CompilerServices; +using System.Runtime.InteropServices; +using Apache.Iggy.IggyClient.Implementations; + +namespace Apache.Iggy.Tests.UtilityTests; + +public class SlicedMemoryOwnerTests +{ + private const int BufferSize = 4096; + + [Fact] + public void Dispose_ReturnsBufferToPool() + { + var owner = ArrayPoolHelper.Rent(BufferSize); + byte[] underlying = GetUnderlyingArray(owner); + owner.Dispose(); + + using var second = ArrayPoolHelper.Rent(BufferSize); + Assert.Same(underlying, GetUnderlyingArray(second)); + } + + [Fact] + public void Dispose_IsIdempotent() + { + var owner = ArrayPoolHelper.Rent(BufferSize); + owner.Dispose(); + owner.Dispose(); + owner.Dispose(); + } + + [Fact] + public void FinalizerIsDeclared() + { + Type sliced = typeof(ArrayPoolHelper) + .GetNestedType("SlicedMemoryOwner", BindingFlags.NonPublic)!; + + MethodInfo? finalizer = sliced.GetMethod("Finalize", BindingFlags.NonPublic | BindingFlags.Instance); + + Assert.NotNull(finalizer); + } + + [Fact] + public void ForgotDispose_FinalizerRunsAndReclaimsInstance() + { + WeakReference weakRef = RentWeak(); + + GC.Collect(); + GC.WaitForPendingFinalizers(); + GC.Collect(); + + Assert.False(weakRef.IsAlive); + + [MethodImpl(MethodImplOptions.NoInlining)] + static WeakReference RentWeak() => new(ArrayPoolHelper.Rent(BufferSize)); + } + + private static byte[] GetUnderlyingArray(IMemoryOwner owner) + { + if (!MemoryMarshal.TryGetArray(owner.Memory, out var segment) || segment.Array is null) + { + throw new InvalidOperationException("SlicedMemoryOwner.Memory must be array-backed."); + } + + return segment.Array; + } +}