From 4eb2b062e2df81e60cb2c0d79cacfd5db1da788c Mon Sep 17 00:00:00 2001 From: dionjansen Date: Sat, 19 Dec 2020 16:22:24 +0000 Subject: [PATCH 01/28] Added missing reference to Microsoft.Bcl.AsyncInterfaces in samples and tests --- samples/Consuming/Consuming.csproj | 1 + samples/Producing/Producing.csproj | 1 + samples/Reading/Reading.csproj | 1 + tests/DotPulsar.StressTests/DotPulsar.StressTests.csproj | 1 + 4 files changed, 4 insertions(+) diff --git a/samples/Consuming/Consuming.csproj b/samples/Consuming/Consuming.csproj index 6e05a4d98..428528e81 100644 --- a/samples/Consuming/Consuming.csproj +++ b/samples/Consuming/Consuming.csproj @@ -6,6 +6,7 @@ + diff --git a/samples/Producing/Producing.csproj b/samples/Producing/Producing.csproj index 6e05a4d98..428528e81 100644 --- a/samples/Producing/Producing.csproj +++ b/samples/Producing/Producing.csproj @@ -6,6 +6,7 @@ + diff --git a/samples/Reading/Reading.csproj b/samples/Reading/Reading.csproj index 6e05a4d98..428528e81 100644 --- a/samples/Reading/Reading.csproj +++ b/samples/Reading/Reading.csproj @@ -6,6 +6,7 @@ + diff --git a/tests/DotPulsar.StressTests/DotPulsar.StressTests.csproj b/tests/DotPulsar.StressTests/DotPulsar.StressTests.csproj index bdf1d1a1a..692758095 100644 --- a/tests/DotPulsar.StressTests/DotPulsar.StressTests.csproj +++ b/tests/DotPulsar.StressTests/DotPulsar.StressTests.csproj @@ -7,6 +7,7 @@ + all From dfb864adefe19e1b62f67bf1d141ee590521dce8 Mon Sep 17 00:00:00 2001 From: dionjansen Date: Sun, 20 Dec 2020 15:49:12 +0000 Subject: [PATCH 02/28] nack delay option --- src/DotPulsar/ConsumerOptions.cs | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/src/DotPulsar/ConsumerOptions.cs b/src/DotPulsar/ConsumerOptions.cs index 3d12d9f55..33fd94568 100644 --- a/src/DotPulsar/ConsumerOptions.cs +++ b/src/DotPulsar/ConsumerOptions.cs @@ -101,5 +101,11 @@ public ConsumerOptions(string subscriptionName, string topic) /// Set the topic for this consumer. This is required. /// public string Topic { get; set; } + + /// + /// Delay to wait before redelivering messages that failed to be processed. + /// When an application uses IConsumer.NegativeAcknowledge(Message), failed messages are redelivered after a fixed timeout. + /// + public ulong NegativeAckRedeliveryDelayMicros { get; set; } } } From 370e1c813a46c77cf5b0686a8d6e73d63c72aebc Mon Sep 17 00:00:00 2001 From: dionjansen Date: Sun, 20 Dec 2020 15:52:28 +0000 Subject: [PATCH 03/28] ack timeout config option --- src/DotPulsar/ConsumerOptions.cs | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/src/DotPulsar/ConsumerOptions.cs b/src/DotPulsar/ConsumerOptions.cs index 33fd94568..bda080eaa 100644 --- a/src/DotPulsar/ConsumerOptions.cs +++ b/src/DotPulsar/ConsumerOptions.cs @@ -107,5 +107,10 @@ public ConsumerOptions(string subscriptionName, string topic) /// When an application uses IConsumer.NegativeAcknowledge(Message), failed messages are redelivered after a fixed timeout. /// public ulong NegativeAckRedeliveryDelayMicros { get; set; } + + /// + /// Timeout of unacked messages + /// + public ulong AckTimeoutMillis { get; set; } } } From 278005f82444d5323af3aca7477f42c6bc692543 Mon Sep 17 00:00:00 2001 From: dionjansen Date: Sun, 20 Dec 2020 17:57:31 +0000 Subject: [PATCH 04/28] naive implementation tracker and message queue --- .../Abstractions/IMessageAcksTracker.cs | 10 ++ .../Internal/Abstractions/IMessageQueue.cs | 9 ++ src/DotPulsar/Internal/MessageAcksTracker.cs | 99 +++++++++++++++++++ src/DotPulsar/Internal/MessageQueue.cs | 33 +++++++ 4 files changed, 151 insertions(+) create mode 100644 src/DotPulsar/Internal/Abstractions/IMessageAcksTracker.cs create mode 100644 src/DotPulsar/Internal/Abstractions/IMessageQueue.cs create mode 100644 src/DotPulsar/Internal/MessageAcksTracker.cs create mode 100644 src/DotPulsar/Internal/MessageQueue.cs diff --git a/src/DotPulsar/Internal/Abstractions/IMessageAcksTracker.cs b/src/DotPulsar/Internal/Abstractions/IMessageAcksTracker.cs new file mode 100644 index 000000000..a02091675 --- /dev/null +++ b/src/DotPulsar/Internal/Abstractions/IMessageAcksTracker.cs @@ -0,0 +1,10 @@ +namespace DotPulsar.Internal.Abstractions +{ + using System.Threading.Tasks; + public interface IMessageAcksTracker + { + T Add(T message); + T Ack(T message); + T Nack(T message); + } +} \ No newline at end of file diff --git a/src/DotPulsar/Internal/Abstractions/IMessageQueue.cs b/src/DotPulsar/Internal/Abstractions/IMessageQueue.cs new file mode 100644 index 000000000..ca800ac74 --- /dev/null +++ b/src/DotPulsar/Internal/Abstractions/IMessageQueue.cs @@ -0,0 +1,9 @@ +namespace DotPulsar.Internal.Abstractions +{ + using System.Threading.Tasks; + public interface IMessageQueue : IDequeue + { + T Acknowledge(T obj); + T NegativeAcknowledge(T obj); + } +} \ No newline at end of file diff --git a/src/DotPulsar/Internal/MessageAcksTracker.cs b/src/DotPulsar/Internal/MessageAcksTracker.cs new file mode 100644 index 000000000..2ff374c4a --- /dev/null +++ b/src/DotPulsar/Internal/MessageAcksTracker.cs @@ -0,0 +1,99 @@ +namespace DotPulsar.Internal +{ + using Abstractions; + using DotPulsar.Abstractions; + using DotPulsar.Exceptions; + using Events; + using Microsoft.Extensions.ObjectPool; + using PulsarApi; + using System; + using System.Collections.Generic; + using System.Linq; + using System.Runtime.CompilerServices; + using System.Threading; + using System.Threading.Tasks; + using System.Diagnostics; + + internal class Tracker + { + private readonly Stopwatch _timer; + private long maxTimeoutMs; + + public Tracker(long timeoutMs) + { + maxTimeoutMs = timeoutMs; + _timer = new Stopwatch(); + _timer.Start(); + } + + public bool IsTimedOut() => _timer.ElapsedMilliseconds > maxTimeoutMs; + + public long msTillTimeout => maxTimeoutMs - _timer.ElapsedMilliseconds; + + public void Reset(long newTimeoutMs) + { + maxTimeoutMs = newTimeoutMs; + _timer.Restart(); + } + } + + public sealed class MessageAcksTracker : IMessageAcksTracker + { + private readonly Dictionary _trackers; + private long _unackedTimeoutMs; + private long _nackTimeoutMs; + private int _trackerDelayMs; + public MessageAcksTracker() + { + _trackers = new Dictionary(); + } + + public async Task StartTracker(IConsumer consumer, CancellationToken cancellationToken) + { + await Task.Yield(); + + while (true) + { + await Task.Delay(_trackerDelayMs); + + var messageIds = new List(); + foreach (KeyValuePair p in _trackers) + { + if (p.Value.IsTimedOut()) + messageIds.Add(p.Key); + } + + if (messageIds.Count() > 0) + await consumer.RedeliverUnacknowledgedMessages(messageIds, cancellationToken).ConfigureAwait(false); + + } + } + public MessageId Add(MessageId message) + { + if (!_trackers.ContainsKey(message)) + { + _trackers.Add(message, new Tracker(_unackedTimeoutMs)); + } + + return message; + } + public MessageId Ack(MessageId message) + { + if (_trackers.ContainsKey(message)) + _trackers.Remove(message); + return message; + } + public MessageId Nack(MessageId message) + { + if (_trackers.ContainsKey(message)) + { + var timer = _trackers[message]; + if (timer.msTillTimeout > _nackTimeoutMs) + timer.Reset(_nackTimeoutMs); + } + else + _trackers.Add(message, new Tracker(_nackTimeoutMs)); + return message; + } + } +} \ No newline at end of file diff --git a/src/DotPulsar/Internal/MessageQueue.cs b/src/DotPulsar/Internal/MessageQueue.cs new file mode 100644 index 000000000..62f583535 --- /dev/null +++ b/src/DotPulsar/Internal/MessageQueue.cs @@ -0,0 +1,33 @@ +namespace DotPulsar.Internal +{ + using Abstractions; + using DotPulsar.Abstractions; + using DotPulsar.Exceptions; + using Events; + using Microsoft.Extensions.ObjectPool; + using PulsarApi; + using System; + using System.Collections.Generic; + using System.Linq; + using System.Runtime.CompilerServices; + using System.Threading; + using System.Threading.Tasks; + + public sealed class MessageQueue : IMessageQueue + { + private readonly AsyncQueue _queue; + private readonly IMessageAcksTracker _tracker; + public MessageQueue(AsyncQueue queue, IMessageAcksTracker tracker) + { + _queue = queue; + _tracker = tracker; + } + public async ValueTask Dequeue(CancellationToken cancellationToken = default) + { + var message = await _queue.Dequeue(cancellationToken).ConfigureAwait(false); + return _tracker.Add(message); + } + public MessageId Acknowledge(MessageId obj) => _tracker.Ack(obj); + public MessageId NegativeAcknowledge(MessageId obj) => _tracker.Nack(obj); + } +} \ No newline at end of file From 79bc9061d3a1676307c239793a8df67134a9816f Mon Sep 17 00:00:00 2001 From: dionjansen Date: Sun, 20 Dec 2020 18:05:27 +0000 Subject: [PATCH 05/28] use message package for queue --- src/DotPulsar/Internal/MessageQueue.cs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/DotPulsar/Internal/MessageQueue.cs b/src/DotPulsar/Internal/MessageQueue.cs index 62f583535..d8540ba37 100644 --- a/src/DotPulsar/Internal/MessageQueue.cs +++ b/src/DotPulsar/Internal/MessageQueue.cs @@ -15,9 +15,9 @@ namespace DotPulsar.Internal public sealed class MessageQueue : IMessageQueue { - private readonly AsyncQueue _queue; + private readonly AsyncQueue _queue; private readonly IMessageAcksTracker _tracker; - public MessageQueue(AsyncQueue queue, IMessageAcksTracker tracker) + public MessageQueue(AsyncQueue queue, IMessageAcksTracker tracker) { _queue = queue; _tracker = tracker; @@ -25,7 +25,7 @@ public MessageQueue(AsyncQueue queue, IMessageAcksTracker public async ValueTask Dequeue(CancellationToken cancellationToken = default) { var message = await _queue.Dequeue(cancellationToken).ConfigureAwait(false); - return _tracker.Add(message); + return _tracker.Add(new MessageId(message.MessageId)); } public MessageId Acknowledge(MessageId obj) => _tracker.Ack(obj); public MessageId NegativeAcknowledge(MessageId obj) => _tracker.Nack(obj); From 546d9e970c0bb960ee259e40fb4b2c5bce167e94 Mon Sep 17 00:00:00 2001 From: dionjansen Date: Mon, 21 Dec 2020 14:29:37 +0000 Subject: [PATCH 06/28] use int for ms values --- src/DotPulsar/ConsumerOptions.cs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/DotPulsar/ConsumerOptions.cs b/src/DotPulsar/ConsumerOptions.cs index bda080eaa..2c4c2bdfb 100644 --- a/src/DotPulsar/ConsumerOptions.cs +++ b/src/DotPulsar/ConsumerOptions.cs @@ -106,11 +106,11 @@ public ConsumerOptions(string subscriptionName, string topic) /// Delay to wait before redelivering messages that failed to be processed. /// When an application uses IConsumer.NegativeAcknowledge(Message), failed messages are redelivered after a fixed timeout. /// - public ulong NegativeAckRedeliveryDelayMicros { get; set; } + public int NegativeAckRedeliveryDelayMicros { get; set; } /// /// Timeout of unacked messages /// - public ulong AckTimeoutMillis { get; set; } + public int AckTimeoutMillis { get; set; } } } From 5e3cea6aa0e54fe1325432ea2a514d63826d864c Mon Sep 17 00:00:00 2001 From: dionjansen Date: Mon, 21 Dec 2020 14:30:28 +0000 Subject: [PATCH 07/28] Non generic interface --- src/DotPulsar/Internal/Abstractions/IMessageQueue.cs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/DotPulsar/Internal/Abstractions/IMessageQueue.cs b/src/DotPulsar/Internal/Abstractions/IMessageQueue.cs index ca800ac74..44a1fae31 100644 --- a/src/DotPulsar/Internal/Abstractions/IMessageQueue.cs +++ b/src/DotPulsar/Internal/Abstractions/IMessageQueue.cs @@ -1,9 +1,9 @@ namespace DotPulsar.Internal.Abstractions { using System.Threading.Tasks; - public interface IMessageQueue : IDequeue + public interface IMessageQueue { - T Acknowledge(T obj); - T NegativeAcknowledge(T obj); + MessageId Acknowledge(MessageId obj); + MessageId NegativeAcknowledge(MessageId obj); } } \ No newline at end of file From 19931bc915d95d2aebf259fd41bbc9b04e0ce27f Mon Sep 17 00:00:00 2001 From: dionjansen Date: Mon, 21 Dec 2020 14:59:11 +0000 Subject: [PATCH 08/28] Added basic implementation for tracker, integrated messagequeue in consumer channel and factory --- .../Abstractions/IMessageAcksTracker.cs | 3 ++ src/DotPulsar/Internal/ConsumerChannel.cs | 4 +-- .../Internal/ConsumerChannelFactory.cs | 11 +++++-- .../Internal/InactiveMessageAcksTracker.cs | 30 +++++++++++++++++++ src/DotPulsar/Internal/MessageAcksTracker.cs | 18 ++++++----- src/DotPulsar/Internal/MessageQueue.cs | 13 ++++++-- .../Internal/ReaderChannelFactory.cs | 4 ++- .../Internal/MessageAcksTrackerTests.cs | 18 +++++++++++ 8 files changed, 86 insertions(+), 15 deletions(-) create mode 100644 src/DotPulsar/Internal/InactiveMessageAcksTracker.cs create mode 100644 tests/DotPulsar.Tests/Internal/MessageAcksTrackerTests.cs diff --git a/src/DotPulsar/Internal/Abstractions/IMessageAcksTracker.cs b/src/DotPulsar/Internal/Abstractions/IMessageAcksTracker.cs index a02091675..b5de8a204 100644 --- a/src/DotPulsar/Internal/Abstractions/IMessageAcksTracker.cs +++ b/src/DotPulsar/Internal/Abstractions/IMessageAcksTracker.cs @@ -1,10 +1,13 @@ namespace DotPulsar.Internal.Abstractions { + using DotPulsar.Abstractions; + using System.Threading; using System.Threading.Tasks; public interface IMessageAcksTracker { T Add(T message); T Ack(T message); T Nack(T message); + Task StartTracker(IConsumer consumer, CancellationToken cancellationToken); } } \ No newline at end of file diff --git a/src/DotPulsar/Internal/ConsumerChannel.cs b/src/DotPulsar/Internal/ConsumerChannel.cs index 02c931640..52fb1ec5e 100644 --- a/src/DotPulsar/Internal/ConsumerChannel.cs +++ b/src/DotPulsar/Internal/ConsumerChannel.cs @@ -24,7 +24,7 @@ namespace DotPulsar.Internal public sealed class ConsumerChannel : IConsumerChannel, IReaderChannel { private readonly ulong _id; - private readonly AsyncQueue _queue; + private readonly MessageQueue _queue; private readonly IConnection _connection; private readonly BatchHandler _batchHandler; private readonly CommandFlow _cachedCommandFlow; @@ -35,7 +35,7 @@ public sealed class ConsumerChannel : IConsumerChannel, IReaderChannel public ConsumerChannel( ulong id, uint messagePrefetchCount, - AsyncQueue queue, + MessageQueue queue, IConnection connection, BatchHandler batchHandler) { diff --git a/src/DotPulsar/Internal/ConsumerChannelFactory.cs b/src/DotPulsar/Internal/ConsumerChannelFactory.cs index 5268264d5..5dadf49a3 100644 --- a/src/DotPulsar/Internal/ConsumerChannelFactory.cs +++ b/src/DotPulsar/Internal/ConsumerChannelFactory.cs @@ -28,20 +28,25 @@ public sealed class ConsumerChannelFactory : IConsumerChannelFactory private readonly IExecute _executor; private readonly CommandSubscribe _subscribe; private readonly uint _messagePrefetchCount; + private readonly int _ackTimeoutMillis; + private readonly int _negativeAckRedeliveryDelayMicros; private readonly BatchHandler _batchHandler; + private readonly IMessageAcksTracker _tracker; public ConsumerChannelFactory( Guid correlationId, IRegisterEvent eventRegister, IConnectionPool connectionPool, IExecute executor, - ConsumerOptions options) + ConsumerOptions options, + IMessageAcksTracker tracker) { _correlationId = correlationId; _eventRegister = eventRegister; _connectionPool = connectionPool; _executor = executor; _messagePrefetchCount = options.MessagePrefetchCount; + _tracker = tracker; _subscribe = new CommandSubscribe { @@ -64,9 +69,11 @@ private async ValueTask GetChannel(CancellationToken cancellat { var connection = await _connectionPool.FindConnectionForTopic(_subscribe.Topic, cancellationToken).ConfigureAwait(false); var messageQueue = new AsyncQueue(); + // TODO perhaps start tracker here? + var consumerMessageQueue = new MessageQueue(messageQueue, _tracker); var channel = new Channel(_correlationId, _eventRegister, messageQueue); var response = await connection.Send(_subscribe, channel, cancellationToken).ConfigureAwait(false); - return new ConsumerChannel(response.ConsumerId, _messagePrefetchCount, messageQueue, connection, _batchHandler); + return new ConsumerChannel(response.ConsumerId, _messagePrefetchCount, consumerMessageQueue, connection, _batchHandler); } } } diff --git a/src/DotPulsar/Internal/InactiveMessageAcksTracker.cs b/src/DotPulsar/Internal/InactiveMessageAcksTracker.cs new file mode 100644 index 000000000..8edb7808c --- /dev/null +++ b/src/DotPulsar/Internal/InactiveMessageAcksTracker.cs @@ -0,0 +1,30 @@ +namespace DotPulsar.Internal +{ + using Abstractions; + using DotPulsar.Abstractions; + using DotPulsar.Exceptions; + using Events; + using Microsoft.Extensions.ObjectPool; + using PulsarApi; + using System; + using System.Collections.Generic; + using System.Linq; + using System.Runtime.CompilerServices; + using System.Threading; + using System.Threading.Tasks; + using System.Diagnostics; + + public sealed class InactiveMessageAcksTracker : IMessageAcksTracker + { + public InactiveMessageAcksTracker() { } + + public async Task StartTracker(IConsumer consumer, CancellationToken cancellationToken) + { + await Task.Yield(); + } + + public MessageId Add(MessageId message) => message; + public MessageId Ack(MessageId message) => message; + public MessageId Nack(MessageId message) => message; + } +} \ No newline at end of file diff --git a/src/DotPulsar/Internal/MessageAcksTracker.cs b/src/DotPulsar/Internal/MessageAcksTracker.cs index 2ff374c4a..95c65fc0f 100644 --- a/src/DotPulsar/Internal/MessageAcksTracker.cs +++ b/src/DotPulsar/Internal/MessageAcksTracker.cs @@ -17,9 +17,9 @@ namespace DotPulsar.Internal internal class Tracker { private readonly Stopwatch _timer; - private long maxTimeoutMs; + private int maxTimeoutMs; - public Tracker(long timeoutMs) + public Tracker(int timeoutMs) { maxTimeoutMs = timeoutMs; _timer = new Stopwatch(); @@ -30,21 +30,25 @@ public Tracker(long timeoutMs) public long msTillTimeout => maxTimeoutMs - _timer.ElapsedMilliseconds; - public void Reset(long newTimeoutMs) + public void Reset(int newTimeoutMs) { maxTimeoutMs = newTimeoutMs; _timer.Restart(); } } + // TODO add mechnism to stop tracker when disposed public sealed class MessageAcksTracker : IMessageAcksTracker { private readonly Dictionary _trackers; - private long _unackedTimeoutMs; - private long _nackTimeoutMs; - private int _trackerDelayMs; - public MessageAcksTracker() + private readonly int _unackedTimeoutMs; + private readonly int _nackTimeoutMs; + private readonly int _trackerDelayMs; + public MessageAcksTracker(int unackedTimeoutMs, int nackTimeoutMs, int trackerDelayMs) { + _unackedTimeoutMs = unackedTimeoutMs; + _nackTimeoutMs = nackTimeoutMs; + _trackerDelayMs = trackerDelayMs; _trackers = new Dictionary(); } diff --git a/src/DotPulsar/Internal/MessageQueue.cs b/src/DotPulsar/Internal/MessageQueue.cs index d8540ba37..b328ab6b4 100644 --- a/src/DotPulsar/Internal/MessageQueue.cs +++ b/src/DotPulsar/Internal/MessageQueue.cs @@ -13,7 +13,7 @@ namespace DotPulsar.Internal using System.Threading; using System.Threading.Tasks; - public sealed class MessageQueue : IMessageQueue + public sealed class MessageQueue : IMessageQueue, IDequeue, IDisposable { private readonly AsyncQueue _queue; private readonly IMessageAcksTracker _tracker; @@ -22,12 +22,19 @@ public MessageQueue(AsyncQueue queue, IMessageAcksTracker Dequeue(CancellationToken cancellationToken = default) + public async ValueTask Dequeue(CancellationToken cancellationToken = default) { var message = await _queue.Dequeue(cancellationToken).ConfigureAwait(false); - return _tracker.Add(new MessageId(message.MessageId)); + _tracker.Add(new MessageId(message.MessageId)); + return message; } public MessageId Acknowledge(MessageId obj) => _tracker.Ack(obj); public MessageId NegativeAcknowledge(MessageId obj) => _tracker.Nack(obj); + + public void Dispose() + { + _queue.Dispose(); + // TODO dispose tracker + } } } \ No newline at end of file diff --git a/src/DotPulsar/Internal/ReaderChannelFactory.cs b/src/DotPulsar/Internal/ReaderChannelFactory.cs index 9efe033e4..dcb5ea75a 100644 --- a/src/DotPulsar/Internal/ReaderChannelFactory.cs +++ b/src/DotPulsar/Internal/ReaderChannelFactory.cs @@ -63,9 +63,11 @@ private async ValueTask GetChannel(CancellationToken cancellatio { var connection = await _connectionPool.FindConnectionForTopic(_subscribe.Topic, cancellationToken).ConfigureAwait(false); var messageQueue = new AsyncQueue(); + var tracker = new InactiveMessageAcksTracker(); // No tracker for reader since readers don't ack. + var consumerMessageQueue = new MessageQueue(messageQueue, tracker); var channel = new Channel(_correlationId, _eventRegister, messageQueue); var response = await connection.Send(_subscribe, channel, cancellationToken).ConfigureAwait(false); - return new ConsumerChannel(response.ConsumerId, _messagePrefetchCount, messageQueue, connection, _batchHandler); + return new ConsumerChannel(response.ConsumerId, _messagePrefetchCount, consumerMessageQueue, connection, _batchHandler); } } } diff --git a/tests/DotPulsar.Tests/Internal/MessageAcksTrackerTests.cs b/tests/DotPulsar.Tests/Internal/MessageAcksTrackerTests.cs new file mode 100644 index 000000000..8a06a76db --- /dev/null +++ b/tests/DotPulsar.Tests/Internal/MessageAcksTrackerTests.cs @@ -0,0 +1,18 @@ +namespace DotPulsar.Tests.Internal +{ + using DotPulsar.Internal; + using FluentAssertions; + using System.Buffers; + using System.Linq; + using Xunit; + + public class MessageAcksTrackerTests + { + [Fact] + public void Test_Instance() + { + var tracker = new MessageAcksTracker(1, 2, 3); + tracker.Should().BeOfType(); + } + } +} \ No newline at end of file From 295eab882c5e633b16f4a68ad7e66a84784c0da3 Mon Sep 17 00:00:00 2001 From: dionjansen Date: Mon, 21 Dec 2020 15:00:17 +0000 Subject: [PATCH 09/28] Create conditional tracker in client --- src/DotPulsar/PulsarClient.cs | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/src/DotPulsar/PulsarClient.cs b/src/DotPulsar/PulsarClient.cs index e2c0577f2..95dba18df 100644 --- a/src/DotPulsar/PulsarClient.cs +++ b/src/DotPulsar/PulsarClient.cs @@ -74,11 +74,19 @@ public IConsumer CreateConsumer(ConsumerOptions options) ThrowIfDisposed(); var correlationId = Guid.NewGuid(); var executor = new Executor(correlationId, _processManager, _exceptionHandler); - var factory = new ConsumerChannelFactory(correlationId, _processManager, _connectionPool, executor, options); + var stateManager = new StateManager(ConsumerState.Disconnected, ConsumerState.Closed, ConsumerState.ReachedEndOfTopic, ConsumerState.Faulted); var consumer = new Consumer(correlationId, options.Topic, _processManager, new NotReadyChannel(), executor, stateManager); if (options.StateChangedHandler is not null) _ = StateMonitor.MonitorConsumer(consumer, options.StateChangedHandler); + IMessageAcksTracker tracker = new InactiveMessageAcksTracker(); + if (options.AckTimeoutMillis > 0 || options.NegativeAckRedeliveryDelayMicros > 0) + // TODO polling interval from options + tracker = new MessageAcksTracker(options.AckTimeoutMillis, options.NegativeAckRedeliveryDelayMicros, 1000); + // TODO + // handle cancellation + _ = tracker.StartTracker(consumer, new CancellationTokenSource().Token); + var factory = new ConsumerChannelFactory(correlationId, _processManager, _connectionPool, executor, options, tracker); var process = new ConsumerProcess(correlationId, stateManager, factory, consumer, options.SubscriptionType == SubscriptionType.Failover); _processManager.Add(process); process.Start(); From 2be511428a1910585ae1b68aa5f2c81f865c06ea Mon Sep 17 00:00:00 2001 From: dionjansen Date: Mon, 21 Dec 2020 15:38:04 +0000 Subject: [PATCH 10/28] Inform queue when acking --- src/DotPulsar/Internal/ConsumerChannel.cs | 3 +++ 1 file changed, 3 insertions(+) diff --git a/src/DotPulsar/Internal/ConsumerChannel.cs b/src/DotPulsar/Internal/ConsumerChannel.cs index 52fb1ec5e..da172dc87 100644 --- a/src/DotPulsar/Internal/ConsumerChannel.cs +++ b/src/DotPulsar/Internal/ConsumerChannel.cs @@ -108,6 +108,9 @@ public async Task Send(CommandAck command, CancellationToken cancellationToken) } command.ConsumerId = _id; + + _queue.Acknowledge(new MessageId(messageId)); + await _connection.Send(command, cancellationToken).ConfigureAwait(false); } From 163ea4591e36768ba1040414169d91c76434545a Mon Sep 17 00:00:00 2001 From: dionjansen Date: Tue, 22 Dec 2020 15:22:44 +0100 Subject: [PATCH 11/28] Cleanup references for VSCode --- samples/Consuming/Consuming.csproj | 1 - samples/Producing/Producing.csproj | 1 - samples/Reading/Reading.csproj | 1 - 3 files changed, 3 deletions(-) diff --git a/samples/Consuming/Consuming.csproj b/samples/Consuming/Consuming.csproj index 428528e81..6e05a4d98 100644 --- a/samples/Consuming/Consuming.csproj +++ b/samples/Consuming/Consuming.csproj @@ -6,7 +6,6 @@ - diff --git a/samples/Producing/Producing.csproj b/samples/Producing/Producing.csproj index 428528e81..6e05a4d98 100644 --- a/samples/Producing/Producing.csproj +++ b/samples/Producing/Producing.csproj @@ -6,7 +6,6 @@ - diff --git a/samples/Reading/Reading.csproj b/samples/Reading/Reading.csproj index 428528e81..6e05a4d98 100644 --- a/samples/Reading/Reading.csproj +++ b/samples/Reading/Reading.csproj @@ -6,7 +6,6 @@ - From 7da197ec4a39f18c002ff9a4a8cc3ee34a674e70 Mon Sep 17 00:00:00 2001 From: dionjansen Date: Wed, 23 Dec 2020 18:28:00 +0100 Subject: [PATCH 12/28] Unacked tracker only --- .../Internal/Abstractions/IMessageQueue.cs | 9 +- .../Abstractions/IUnackedMessageTracker.cs | 15 + .../Internal/ConsumerChannelFactory.cs | 5 +- .../Internal/InactiveUnackedMessageTracker.cs | 30 + src/DotPulsar/Internal/MessageQueue.cs | 15 +- .../Internal/UnackedMessageTracker.cs | 104 +++ src/DotPulsar/PulsarClient.cs | 14 +- src/DotPulsar/mono_crash.11a6748113.0.json | 731 ++++++++++++++++++ 8 files changed, 902 insertions(+), 21 deletions(-) create mode 100644 src/DotPulsar/Internal/Abstractions/IUnackedMessageTracker.cs create mode 100644 src/DotPulsar/Internal/InactiveUnackedMessageTracker.cs create mode 100644 src/DotPulsar/Internal/UnackedMessageTracker.cs create mode 100644 src/DotPulsar/mono_crash.11a6748113.0.json diff --git a/src/DotPulsar/Internal/Abstractions/IMessageQueue.cs b/src/DotPulsar/Internal/Abstractions/IMessageQueue.cs index 44a1fae31..0b9f2fdd5 100644 --- a/src/DotPulsar/Internal/Abstractions/IMessageQueue.cs +++ b/src/DotPulsar/Internal/Abstractions/IMessageQueue.cs @@ -1,9 +1,10 @@ namespace DotPulsar.Internal.Abstractions { - using System.Threading.Tasks; - public interface IMessageQueue + using System; + + public interface IMessageQueue : IDequeue, IDisposable { - MessageId Acknowledge(MessageId obj); - MessageId NegativeAcknowledge(MessageId obj); + void Acknowledge(MessageId obj); + void NegativeAcknowledge(MessageId obj); } } \ No newline at end of file diff --git a/src/DotPulsar/Internal/Abstractions/IUnackedMessageTracker.cs b/src/DotPulsar/Internal/Abstractions/IUnackedMessageTracker.cs new file mode 100644 index 000000000..ca2b703c5 --- /dev/null +++ b/src/DotPulsar/Internal/Abstractions/IUnackedMessageTracker.cs @@ -0,0 +1,15 @@ +namespace DotPulsar.Internal.Abstractions +{ + using DotPulsar.Abstractions; + using System.Threading.Tasks; + using System; + + public interface IUnackedMessageTracker : IDisposable + { + void Add(MessageId messageId); + + void Ack(MessageId messageId); + + Task Start(IConsumer consumer); + } +} diff --git a/src/DotPulsar/Internal/ConsumerChannelFactory.cs b/src/DotPulsar/Internal/ConsumerChannelFactory.cs index 5dadf49a3..5b0c05b57 100644 --- a/src/DotPulsar/Internal/ConsumerChannelFactory.cs +++ b/src/DotPulsar/Internal/ConsumerChannelFactory.cs @@ -31,7 +31,7 @@ public sealed class ConsumerChannelFactory : IConsumerChannelFactory private readonly int _ackTimeoutMillis; private readonly int _negativeAckRedeliveryDelayMicros; private readonly BatchHandler _batchHandler; - private readonly IMessageAcksTracker _tracker; + private readonly IUnackedMessageTracker _tracker; public ConsumerChannelFactory( Guid correlationId, @@ -39,7 +39,7 @@ public ConsumerChannelFactory( IConnectionPool connectionPool, IExecute executor, ConsumerOptions options, - IMessageAcksTracker tracker) + IUnackedMessageTracker tracker) { _correlationId = correlationId; _eventRegister = eventRegister; @@ -69,7 +69,6 @@ private async ValueTask GetChannel(CancellationToken cancellat { var connection = await _connectionPool.FindConnectionForTopic(_subscribe.Topic, cancellationToken).ConfigureAwait(false); var messageQueue = new AsyncQueue(); - // TODO perhaps start tracker here? var consumerMessageQueue = new MessageQueue(messageQueue, _tracker); var channel = new Channel(_correlationId, _eventRegister, messageQueue); var response = await connection.Send(_subscribe, channel, cancellationToken).ConfigureAwait(false); diff --git a/src/DotPulsar/Internal/InactiveUnackedMessageTracker.cs b/src/DotPulsar/Internal/InactiveUnackedMessageTracker.cs new file mode 100644 index 000000000..07fd69f1b --- /dev/null +++ b/src/DotPulsar/Internal/InactiveUnackedMessageTracker.cs @@ -0,0 +1,30 @@ +namespace DotPulsar.Internal +{ + using System.Threading.Tasks; + using Abstractions; + using DotPulsar.Abstractions; + + public class InactiveUnackedMessageTracker : IUnackedMessageTracker + { + public InactiveUnackedMessageTracker() + { + } + + public void Ack(MessageId messageId) + { + return; + } + + public void Add(MessageId messageId) + { + return; + } + + public Task Start(IConsumer consumer) => Task.CompletedTask; + + public void Dispose() { + return; + } + + } +} diff --git a/src/DotPulsar/Internal/MessageQueue.cs b/src/DotPulsar/Internal/MessageQueue.cs index b328ab6b4..565a70480 100644 --- a/src/DotPulsar/Internal/MessageQueue.cs +++ b/src/DotPulsar/Internal/MessageQueue.cs @@ -16,8 +16,8 @@ namespace DotPulsar.Internal public sealed class MessageQueue : IMessageQueue, IDequeue, IDisposable { private readonly AsyncQueue _queue; - private readonly IMessageAcksTracker _tracker; - public MessageQueue(AsyncQueue queue, IMessageAcksTracker tracker) + private readonly IUnackedMessageTracker _tracker; + public MessageQueue(AsyncQueue queue, IUnackedMessageTracker tracker) { _queue = queue; _tracker = tracker; @@ -28,13 +28,18 @@ public async ValueTask Dequeue(CancellationToken cancellationTok _tracker.Add(new MessageId(message.MessageId)); return message; } - public MessageId Acknowledge(MessageId obj) => _tracker.Ack(obj); - public MessageId NegativeAcknowledge(MessageId obj) => _tracker.Nack(obj); + public void Acknowledge(MessageId obj) => _tracker.Ack(obj); + + public void NegativeAcknowledge(MessageId obj) + { + throw new NotImplementedException(); + } public void Dispose() { _queue.Dispose(); - // TODO dispose tracker + _tracker.Dispose(); } + } } \ No newline at end of file diff --git a/src/DotPulsar/Internal/UnackedMessageTracker.cs b/src/DotPulsar/Internal/UnackedMessageTracker.cs new file mode 100644 index 000000000..299dfa1e2 --- /dev/null +++ b/src/DotPulsar/Internal/UnackedMessageTracker.cs @@ -0,0 +1,104 @@ +namespace DotPulsar.Internal +{ + using Abstractions; + using DotPulsar.Abstractions; + using System; + using System.Collections.Concurrent; + using System.Diagnostics; + using System.Linq; + using System.Collections.Generic; + using System.Threading; + using System.Threading.Tasks; + + public readonly struct AwaitingAck + { + public MessageId MessageId { get; } + public long Timestamp { get; } + + public AwaitingAck(MessageId messageId) + { + MessageId = messageId; + Timestamp = Stopwatch.GetTimestamp(); + } + + public TimeSpan Elapsed => + TimeSpan.FromTicks( + (Stopwatch.GetTimestamp() - Timestamp) / + (Stopwatch.Frequency) * 1000); + } + + public sealed class UnackedMessageTracker : IUnackedMessageTracker + { + private readonly TimeSpan _ackTimeout; + private readonly TimeSpan _pollingTimeout; + private readonly ConcurrentQueue _awaitingAcks; + private readonly List _acked; + private readonly CancellationTokenSource _cancellationTokenSource; + + + public UnackedMessageTracker(TimeSpan ackTimeout, TimeSpan pollingTimeout) + { + _ackTimeout = ackTimeout; + _pollingTimeout = pollingTimeout; + _awaitingAcks = new ConcurrentQueue(); + _acked = new List(); + _cancellationTokenSource = new CancellationTokenSource(); + } + + public void Add(MessageId messageId) + { + _awaitingAcks.Enqueue(new AwaitingAck(messageId)); + } + + public void Ack(MessageId messageId) + { + // We only need to store the highest cumulative ack we see (if there is one) + // and the MessageIds not included by that cumulative ack. + _acked.Add(messageId); + } + + public Task Start(IConsumer consumer) + { + var cancellationToken = _cancellationTokenSource.Token; + + return Task.Run(async () => { + while (!cancellationToken.IsCancellationRequested) + { + var messages = CheckUnackedMessages(); + + if (messages.Count() > 0) + await consumer.RedeliverUnacknowledgedMessages(messages, cancellationToken); + + await Task.Delay(_pollingTimeout, cancellationToken); + } + }, cancellationToken); + } + + private IEnumerable CheckUnackedMessages() + { + AwaitingAck awaiting; + var result = new List(); + + while (_awaitingAcks.TryPeek(out awaiting) + && awaiting.Elapsed > _ackTimeout) + { + // Can I safely use Dequeue now instead of TryDequeue? + if (_awaitingAcks.TryDequeue(out awaiting)) + { + //If the MessageId is not acknowledged + if (!_acked.Contains(awaiting.MessageId)) + result.Add(awaiting.MessageId); + else + _acked.Remove(awaiting.MessageId); + } + } + + return result; + } + + public void Dispose() + { + this._cancellationTokenSource.Cancel(); + } + } +} diff --git a/src/DotPulsar/PulsarClient.cs b/src/DotPulsar/PulsarClient.cs index 95dba18df..f1817ebf9 100644 --- a/src/DotPulsar/PulsarClient.cs +++ b/src/DotPulsar/PulsarClient.cs @@ -74,19 +74,15 @@ public IConsumer CreateConsumer(ConsumerOptions options) ThrowIfDisposed(); var correlationId = Guid.NewGuid(); var executor = new Executor(correlationId, _processManager, _exceptionHandler); - var stateManager = new StateManager(ConsumerState.Disconnected, ConsumerState.Closed, ConsumerState.ReachedEndOfTopic, ConsumerState.Faulted); var consumer = new Consumer(correlationId, options.Topic, _processManager, new NotReadyChannel(), executor, stateManager); if (options.StateChangedHandler is not null) _ = StateMonitor.MonitorConsumer(consumer, options.StateChangedHandler); - IMessageAcksTracker tracker = new InactiveMessageAcksTracker(); - if (options.AckTimeoutMillis > 0 || options.NegativeAckRedeliveryDelayMicros > 0) - // TODO polling interval from options - tracker = new MessageAcksTracker(options.AckTimeoutMillis, options.NegativeAckRedeliveryDelayMicros, 1000); - // TODO - // handle cancellation - _ = tracker.StartTracker(consumer, new CancellationTokenSource().Token); - var factory = new ConsumerChannelFactory(correlationId, _processManager, _connectionPool, executor, options, tracker); + IUnackedMessageTracker unackedTracker = options.AckTimeoutMillis > 0 + ? new UnackedMessageTracker(TimeSpan.FromMilliseconds(options.AckTimeoutMillis), TimeSpan.FromSeconds(1)) + : new InactiveUnackedMessageTracker(); + unackedTracker.Start(consumer); + var factory = new ConsumerChannelFactory(correlationId, _processManager, _connectionPool, executor, options, unackedTracker); var process = new ConsumerProcess(correlationId, stateManager, factory, consumer, options.SubscriptionType == SubscriptionType.Failover); _processManager.Add(process); process.Start(); diff --git a/src/DotPulsar/mono_crash.11a6748113.0.json b/src/DotPulsar/mono_crash.11a6748113.0.json new file mode 100644 index 000000000..8eed6a0fe --- /dev/null +++ b/src/DotPulsar/mono_crash.11a6748113.0.json @@ -0,0 +1,731 @@ +{ + "protocol_version" : "0.0.6", + "configuration" : { + "version" : "(6.12.0.113) (2020-02/4fdfb5b1fd5)", + "tlc" : "normal", + "sigsgev" : "altstack", + "notifications" : "kqueue", + "architecture" : "amd64", + "disabled_features" : "none", + "smallconfig" : "disabled", + "bigarrays" : "disabled", + "softdebug" : "enabled", + "interpreter" : "enabled", + "llvm_support" : "0", + "suspend" : "hybrid" + }, + "memory" : { + "Resident Size" : "132136960", + "Virtual Size" : "4652847104", + "minor_gc_time" : "0", + "major_gc_time" : "0", + "minor_gc_count" : "0", + "major_gc_count" : "0", + "major_gc_time_concurrent" : "0" + }, + "threads" : [ + { + "is_managed" : false, + "offset_free_hash" : "0x0", + "offset_rich_hash" : "0x0", + "crashed" : false, + "native_thread_id" : "0x112649d40", + "thread_info_addr" : "0x7fc72580a000", + "thread_name" : "tid_307", + "ctx" : { + "IP" : "0x7fff6ee8b8f6", + "SP" : "0x7ffee847fc68", + "BP" : "0x7ffee847fd00" + }, + "unmanaged_frames" : [ + { + "is_managed" : "false", + "native_address" : "0x107830a06", + "native_offset" : "0x00000" + } +, + { + "is_managed" : "false", + "native_address" : "0x1079cc165", + "native_offset" : "0x00000" + } +, + { + "is_managed" : "false", + "native_address" : "0x1079cbe97", + "native_offset" : "0x00000" + } +, + { + "is_managed" : "false", + "native_address" : "0x10789c9d0", + "native_offset" : "0x00000" + } +, + { + "is_managed" : "false", + "native_address" : "0x7fff6ef40b1d", + "native_offset" : "0x00000" + } +, + { + "is_managed" : "false", + "native_address" : "0x7ffee847f958", + "native_offset" : "0x00000" + } +, + { + "is_managed" : "false", + "native_address" : "0x107a66ee0", + "native_offset" : "0x00000" + } +, + { + "is_managed" : "false", + "native_address" : "0x107a7efc1", + "native_offset" : "0x00000" + } +, + { + "is_managed" : "false", + "native_address" : "0x1079c9084", + "native_offset" : "0x00000" + } +, + { + "is_managed" : "false", + "native_address" : "0x1079c8e8a", + "native_offset" : "0x00000" + } +, + { + "is_managed" : "false", + "native_address" : "0x1077f60fa", + "native_offset" : "0x00000" + } +, + { + "is_managed" : "false", + "native_address" : "0x1077821e8", + "native_offset" : "0x00000" + } +, + { + "is_managed" : "false", + "native_address" : "0x7fff6ed3f405", + "native_offset" : "0x00000" + } +, + { + "is_managed" : "false", + "native_address" : "0x5", + "native_offset" : "0x00000" + } + + ] + }, + { + "is_managed" : false, + "offset_free_hash" : "0x0", + "offset_rich_hash" : "0x0", + "crashed" : false, + "native_thread_id" : "0x7000067cc000", + "thread_info_addr" : "0x7fc726816400", + "thread_name" : "Finalizer", + "ctx" : { + "IP" : "0x7fff6ee88182", + "SP" : "0x7000067cbeb8", + "BP" : "0x7000067cbf00" + }, + "unmanaged_frames" : [ + { + "is_managed" : "false", + "native_address" : "0x107830a06", + "native_offset" : "0x00000" + } +, + { + "is_managed" : "false", + "native_address" : "0x1079cc165", + "native_offset" : "0x00000" + } +, + { + "is_managed" : "false", + "native_address" : "0x1079cbe97", + "native_offset" : "0x00000" + } +, + { + "is_managed" : "false", + "native_address" : "0x10789c9d0", + "native_offset" : "0x00000" + } +, + { + "is_managed" : "false", + "native_address" : "0x7fff6ef40b1d", + "native_offset" : "0x00000" + } +, + { + "is_managed" : "false", + "native_address" : "unregistered" + } +, + { + "is_managed" : "false", + "native_address" : "0x1079ccdad", + "native_offset" : "0x00000" + } +, + { + "is_managed" : "false", + "native_address" : "0x7fff6ef4bd76", + "native_offset" : "0x00000" + } +, + { + "is_managed" : "false", + "native_address" : "0x7fff6ef485d7", + "native_offset" : "0x00000" + } + + ] + }, + { + "is_managed" : true, + "offset_free_hash" : "0x11a6748113", + "offset_rich_hash" : "0x11a6748628", + "crashed" : true, + "native_thread_id" : "0x700008b48000", + "thread_info_addr" : "0x7fc726c9fe00", + "thread_name" : "tid_9b03", + "ctx" : { + "IP" : "0x7fff6ee8e47a", + "SP" : "0x700008b40b98", + "BP" : "0x700008b40bc0" + }, + "managed_frames" : [ + { + "is_managed" : "false", + "native_address" : "unregistered" + } +, + { + "is_managed" : "true", + "guid" : "58E88B8B-C383-4E07-A237-B0C4D21C89F2", + "token" : "0x00000", + "native_offset" : "0x0", + "filename" : "mscorlib.dll", + "sizeofimage" : "0x472000", + "timestamp" : "0x86058d7a", + "il_offset" : "0x00000" + } +, + { + "is_managed" : "true", + "guid" : "58E88B8B-C383-4E07-A237-B0C4D21C89F2", + "token" : "0x00000", + "native_offset" : "0x0", + "filename" : "mscorlib.dll", + "sizeofimage" : "0x472000", + "timestamp" : "0x86058d7a", + "il_offset" : "0x00015" + } +, + { + "is_managed" : "true", + "guid" : "181E198A-F674-4D54-82D2-31B9E17B3F9B", + "token" : "0x6004287", + "native_offset" : "0x0", + "filename" : "System.dll", + "sizeofimage" : "0x292000", + "timestamp" : "0xc58d4017", + "il_offset" : "0x0026a" + } +, + { + "is_managed" : "true", + "guid" : "181E198A-F674-4D54-82D2-31B9E17B3F9B", + "token" : "0x6004487", + "native_offset" : "0x0", + "filename" : "System.dll", + "sizeofimage" : "0x292000", + "timestamp" : "0xc58d4017", + "il_offset" : "0x00043" + } +, + { + "is_managed" : "true", + "guid" : "58E88B8B-C383-4E07-A237-B0C4D21C89F2", + "token" : "0x6001ed4", + "native_offset" : "0x0", + "filename" : "mscorlib.dll", + "sizeofimage" : "0x472000", + "timestamp" : "0x86058d7a", + "il_offset" : "0x00071" + } +, + { + "is_managed" : "true", + "guid" : "58E88B8B-C383-4E07-A237-B0C4D21C89F2", + "token" : "0x6001ed2", + "native_offset" : "0x0", + "filename" : "mscorlib.dll", + "sizeofimage" : "0x472000", + "timestamp" : "0x86058d7a", + "il_offset" : "0x00000" + } +, + { + "is_managed" : "true", + "guid" : "58E88B8B-C383-4E07-A237-B0C4D21C89F2", + "token" : "0x6001ed1", + "native_offset" : "0x0", + "filename" : "mscorlib.dll", + "sizeofimage" : "0x472000", + "timestamp" : "0x86058d7a", + "il_offset" : "0x0002b" + } +, + { + "is_managed" : "true", + "guid" : "181E198A-F674-4D54-82D2-31B9E17B3F9B", + "token" : "0x6004286", + "native_offset" : "0x0", + "filename" : "System.dll", + "sizeofimage" : "0x292000", + "timestamp" : "0xc58d4017", + "il_offset" : "0x00093" + } +, + { + "is_managed" : "true", + "guid" : "181E198A-F674-4D54-82D2-31B9E17B3F9B", + "token" : "0x00000", + "native_offset" : "0x0", + "filename" : "System.dll", + "sizeofimage" : "0x292000", + "timestamp" : "0xc58d4017", + "il_offset" : "0xffffffff" + } +, + { + "is_managed" : "false", + "native_address" : "unregistered" + } +, + { + "is_managed" : "true", + "guid" : "181E198A-F674-4D54-82D2-31B9E17B3F9B", + "token" : "0x00000", + "native_offset" : "0x0", + "filename" : "System.dll", + "sizeofimage" : "0x292000", + "timestamp" : "0xc58d4017", + "il_offset" : "0x00000" + } +, + { + "is_managed" : "true", + "guid" : "181E198A-F674-4D54-82D2-31B9E17B3F9B", + "token" : "0x600447f", + "native_offset" : "0x0", + "filename" : "System.dll", + "sizeofimage" : "0x292000", + "timestamp" : "0xc58d4017", + "il_offset" : "0x0003a" + } +, + { + "is_managed" : "true", + "guid" : "58E88B8B-C383-4E07-A237-B0C4D21C89F2", + "token" : "0x6001f2c", + "native_offset" : "0x0", + "filename" : "mscorlib.dll", + "sizeofimage" : "0x472000", + "timestamp" : "0x86058d7a", + "il_offset" : "0x00025" + } +, + { + "is_managed" : "true", + "guid" : "58E88B8B-C383-4E07-A237-B0C4D21C89F2", + "token" : "0x6001ed4", + "native_offset" : "0x0", + "filename" : "mscorlib.dll", + "sizeofimage" : "0x472000", + "timestamp" : "0x86058d7a", + "il_offset" : "0x00071" + } +, + { + "is_managed" : "true", + "guid" : "58E88B8B-C383-4E07-A237-B0C4D21C89F2", + "token" : "0x6001ed2", + "native_offset" : "0x0", + "filename" : "mscorlib.dll", + "sizeofimage" : "0x472000", + "timestamp" : "0x86058d7a", + "il_offset" : "0x00000" + } +, + { + "is_managed" : "true", + "guid" : "58E88B8B-C383-4E07-A237-B0C4D21C89F2", + "token" : "0x6001ed1", + "native_offset" : "0x0", + "filename" : "mscorlib.dll", + "sizeofimage" : "0x472000", + "timestamp" : "0x86058d7a", + "il_offset" : "0x0002b" + } +, + { + "is_managed" : "true", + "guid" : "58E88B8B-C383-4E07-A237-B0C4D21C89F2", + "token" : "0x00000", + "native_offset" : "0x0", + "filename" : "mscorlib.dll", + "sizeofimage" : "0x472000", + "timestamp" : "0x86058d7a", + "il_offset" : "0x0002a" + } + + ], + "unmanaged_frames" : [ + { + "is_managed" : "false", + "native_address" : "0x107830a06", + "native_offset" : "0x00000" + } +, + { + "is_managed" : "false", + "native_address" : "0x1079cc165", + "native_offset" : "0x00000" + } +, + { + "is_managed" : "false", + "native_address" : "0x1079cc7da", + "native_offset" : "0x00000" + } +, + { + "is_managed" : "false", + "native_address" : "0x10789da97", + "native_offset" : "0x00000" + } +, + { + "is_managed" : "false", + "native_address" : "0x10783569e", + "native_offset" : "0x00000" + } +, + { + "is_managed" : "false", + "native_address" : "0x10789cd8f", + "native_offset" : "0x00000" + } +, + { + "is_managed" : "false", + "native_address" : "0x7fff6ef40b1d", + "native_offset" : "0x00000" + } +, + { + "is_managed" : "false", + "native_address" : "unregistered" + } +, + { + "is_managed" : "false", + "native_address" : "0x7fff6ee16a08", + "native_offset" : "0x00000" + } +, + { + "is_managed" : "false", + "native_address" : "0x107a89f67", + "native_offset" : "0x00000" + } +, + { + "is_managed" : "false", + "native_address" : "0x107a6adef", + "native_offset" : "0x00000" + } +, + { + "is_managed" : "false", + "native_address" : "0x107a8a3fe", + "native_offset" : "0x00000" + } +, + { + "is_managed" : "false", + "native_address" : "0x107a8a57f", + "native_offset" : "0x00000" + } +, + { + "is_managed" : "false", + "native_address" : "0x107a8a5ba", + "native_offset" : "0x00000" + } +, + { + "is_managed" : "false", + "native_address" : "0x107a7ed8e", + "native_offset" : "0x00000" + } +, + { + "is_managed" : "false", + "native_address" : "0x1079c66d1", + "native_offset" : "0x00000" + } +, + { + "is_managed" : "false", + "native_address" : "0x1079cd34d", + "native_offset" : "0x00000" + } +, + { + "is_managed" : "false", + "native_address" : "0x1079cafbc", + "native_offset" : "0x00000" + } +, + { + "is_managed" : "true", + "guid" : "58E88B8B-C383-4E07-A237-B0C4D21C89F2", + "token" : "0x00000", + "native_offset" : "0x0", + "filename" : "mscorlib.dll", + "sizeofimage" : "0x472000", + "timestamp" : "0x86058d7a", + "il_offset" : "0x00000" + } +, + { + "is_managed" : "true", + "guid" : "58E88B8B-C383-4E07-A237-B0C4D21C89F2", + "token" : "0x00000", + "native_offset" : "0x0", + "filename" : "mscorlib.dll", + "sizeofimage" : "0x472000", + "timestamp" : "0x86058d7a", + "il_offset" : "0x00000" + } +, + { + "is_managed" : "true", + "guid" : "181E198A-F674-4D54-82D2-31B9E17B3F9B", + "token" : "0x6004287", + "native_offset" : "0x0", + "filename" : "System.dll", + "sizeofimage" : "0x292000", + "timestamp" : "0xc58d4017", + "il_offset" : "0x00000" + } +, + { + "is_managed" : "true", + "guid" : "181E198A-F674-4D54-82D2-31B9E17B3F9B", + "token" : "0x6004487", + "native_offset" : "0x0", + "filename" : "System.dll", + "sizeofimage" : "0x292000", + "timestamp" : "0xc58d4017", + "il_offset" : "0x00000" + } +, + { + "is_managed" : "true", + "guid" : "58E88B8B-C383-4E07-A237-B0C4D21C89F2", + "token" : "0x6001ed4", + "native_offset" : "0x0", + "filename" : "mscorlib.dll", + "sizeofimage" : "0x472000", + "timestamp" : "0x86058d7a", + "il_offset" : "0x00000" + } +, + { + "is_managed" : "true", + "guid" : "58E88B8B-C383-4E07-A237-B0C4D21C89F2", + "token" : "0x6001ed2", + "native_offset" : "0x0", + "filename" : "mscorlib.dll", + "sizeofimage" : "0x472000", + "timestamp" : "0x86058d7a", + "il_offset" : "0x00000" + } +, + { + "is_managed" : "true", + "guid" : "181E198A-F674-4D54-82D2-31B9E17B3F9B", + "token" : "0x00000", + "native_offset" : "0x0", + "filename" : "System.dll", + "sizeofimage" : "0x292000", + "timestamp" : "0xc58d4017", + "il_offset" : "0x00000" + } +, + { + "is_managed" : "false", + "native_address" : "0x7fff3933f8eb", + "native_offset" : "0x00000" + } +, + { + "is_managed" : "false", + "native_address" : "0x7fff3933ed13", + "native_offset" : "0x00000" + } +, + { + "is_managed" : "false", + "native_address" : "0x7fff3933ec0d", + "native_offset" : "0x00000" + } +, + { + "is_managed" : "false", + "native_address" : "0x7fff3934168d", + "native_offset" : "0x00000" + } +, + { + "is_managed" : "false", + "native_address" : "0x7fff37bf1d23", + "native_offset" : "0x00000" + } +, + { + "is_managed" : "false", + "native_address" : "0x7fff37bf1c1f", + "native_offset" : "0x00000" + } +, + { + "is_managed" : "false", + "native_address" : "0x7fff37bf1b6f", + "native_offset" : "0x00000" + } +, + { + "is_managed" : "false", + "native_address" : "0x7fff37bd9acc", + "native_offset" : "0x00000" + } +, + { + "is_managed" : "false", + "native_address" : "0x7fff37bd8e13", + "native_offset" : "0x00000" + } +, + { + "is_managed" : "false", + "native_address" : "0x7fff37bd8bea", + "native_offset" : "0x00000" + } +, + { + "is_managed" : "false", + "native_address" : "0x10e7edfcc", + "native_offset" : "0x00000" + } +, + { + "is_managed" : "true", + "guid" : "181E198A-F674-4D54-82D2-31B9E17B3F9B", + "token" : "0x00000", + "native_offset" : "0x0", + "filename" : "System.dll", + "sizeofimage" : "0x292000", + "timestamp" : "0xc58d4017", + "il_offset" : "0x00000" + } +, + { + "is_managed" : "true", + "guid" : "181E198A-F674-4D54-82D2-31B9E17B3F9B", + "token" : "0x600447f", + "native_offset" : "0x0", + "filename" : "System.dll", + "sizeofimage" : "0x292000", + "timestamp" : "0xc58d4017", + "il_offset" : "0x00000" + } +, + { + "is_managed" : "true", + "guid" : "58E88B8B-C383-4E07-A237-B0C4D21C89F2", + "token" : "0x6001f2c", + "native_offset" : "0x0", + "filename" : "mscorlib.dll", + "sizeofimage" : "0x472000", + "timestamp" : "0x86058d7a", + "il_offset" : "0x00000" + } +, + { + "is_managed" : "true", + "guid" : "58E88B8B-C383-4E07-A237-B0C4D21C89F2", + "token" : "0x6001ed2", + "native_offset" : "0x0", + "filename" : "mscorlib.dll", + "sizeofimage" : "0x472000", + "timestamp" : "0x86058d7a", + "il_offset" : "0x00000" + } +, + { + "is_managed" : "false", + "native_address" : "0x1077929d2", + "native_offset" : "0x00000" + } +, + { + "is_managed" : "false", + "native_address" : "0x1079a1f97", + "native_offset" : "0x00000" + } +, + { + "is_managed" : "false", + "native_address" : "0x1079a8840", + "native_offset" : "0x00000" + } +, + { + "is_managed" : "false", + "native_address" : "0x1079ccded", + "native_offset" : "0x00000" + } +, + { + "is_managed" : "false", + "native_address" : "0x7fff6ef4bd76", + "native_offset" : "0x00000" + } +, + { + "is_managed" : "false", + "native_address" : "0x7fff6ef485d7", + "native_offset" : "0x00000" + } + + ] +} +] +} \ No newline at end of file From d759c0656971daa08a231e1163c09fb81051ddfc Mon Sep 17 00:00:00 2001 From: dionjansen Date: Wed, 23 Dec 2020 18:30:51 +0100 Subject: [PATCH 13/28] Cleanup --- src/DotPulsar/mono_crash.11a6748113.0.json | 731 --------------------- 1 file changed, 731 deletions(-) delete mode 100644 src/DotPulsar/mono_crash.11a6748113.0.json diff --git a/src/DotPulsar/mono_crash.11a6748113.0.json b/src/DotPulsar/mono_crash.11a6748113.0.json deleted file mode 100644 index 8eed6a0fe..000000000 --- a/src/DotPulsar/mono_crash.11a6748113.0.json +++ /dev/null @@ -1,731 +0,0 @@ -{ - "protocol_version" : "0.0.6", - "configuration" : { - "version" : "(6.12.0.113) (2020-02/4fdfb5b1fd5)", - "tlc" : "normal", - "sigsgev" : "altstack", - "notifications" : "kqueue", - "architecture" : "amd64", - "disabled_features" : "none", - "smallconfig" : "disabled", - "bigarrays" : "disabled", - "softdebug" : "enabled", - "interpreter" : "enabled", - "llvm_support" : "0", - "suspend" : "hybrid" - }, - "memory" : { - "Resident Size" : "132136960", - "Virtual Size" : "4652847104", - "minor_gc_time" : "0", - "major_gc_time" : "0", - "minor_gc_count" : "0", - "major_gc_count" : "0", - "major_gc_time_concurrent" : "0" - }, - "threads" : [ - { - "is_managed" : false, - "offset_free_hash" : "0x0", - "offset_rich_hash" : "0x0", - "crashed" : false, - "native_thread_id" : "0x112649d40", - "thread_info_addr" : "0x7fc72580a000", - "thread_name" : "tid_307", - "ctx" : { - "IP" : "0x7fff6ee8b8f6", - "SP" : "0x7ffee847fc68", - "BP" : "0x7ffee847fd00" - }, - "unmanaged_frames" : [ - { - "is_managed" : "false", - "native_address" : "0x107830a06", - "native_offset" : "0x00000" - } -, - { - "is_managed" : "false", - "native_address" : "0x1079cc165", - "native_offset" : "0x00000" - } -, - { - "is_managed" : "false", - "native_address" : "0x1079cbe97", - "native_offset" : "0x00000" - } -, - { - "is_managed" : "false", - "native_address" : "0x10789c9d0", - "native_offset" : "0x00000" - } -, - { - "is_managed" : "false", - "native_address" : "0x7fff6ef40b1d", - "native_offset" : "0x00000" - } -, - { - "is_managed" : "false", - "native_address" : "0x7ffee847f958", - "native_offset" : "0x00000" - } -, - { - "is_managed" : "false", - "native_address" : "0x107a66ee0", - "native_offset" : "0x00000" - } -, - { - "is_managed" : "false", - "native_address" : "0x107a7efc1", - "native_offset" : "0x00000" - } -, - { - "is_managed" : "false", - "native_address" : "0x1079c9084", - "native_offset" : "0x00000" - } -, - { - "is_managed" : "false", - "native_address" : "0x1079c8e8a", - "native_offset" : "0x00000" - } -, - { - "is_managed" : "false", - "native_address" : "0x1077f60fa", - "native_offset" : "0x00000" - } -, - { - "is_managed" : "false", - "native_address" : "0x1077821e8", - "native_offset" : "0x00000" - } -, - { - "is_managed" : "false", - "native_address" : "0x7fff6ed3f405", - "native_offset" : "0x00000" - } -, - { - "is_managed" : "false", - "native_address" : "0x5", - "native_offset" : "0x00000" - } - - ] - }, - { - "is_managed" : false, - "offset_free_hash" : "0x0", - "offset_rich_hash" : "0x0", - "crashed" : false, - "native_thread_id" : "0x7000067cc000", - "thread_info_addr" : "0x7fc726816400", - "thread_name" : "Finalizer", - "ctx" : { - "IP" : "0x7fff6ee88182", - "SP" : "0x7000067cbeb8", - "BP" : "0x7000067cbf00" - }, - "unmanaged_frames" : [ - { - "is_managed" : "false", - "native_address" : "0x107830a06", - "native_offset" : "0x00000" - } -, - { - "is_managed" : "false", - "native_address" : "0x1079cc165", - "native_offset" : "0x00000" - } -, - { - "is_managed" : "false", - "native_address" : "0x1079cbe97", - "native_offset" : "0x00000" - } -, - { - "is_managed" : "false", - "native_address" : "0x10789c9d0", - "native_offset" : "0x00000" - } -, - { - "is_managed" : "false", - "native_address" : "0x7fff6ef40b1d", - "native_offset" : "0x00000" - } -, - { - "is_managed" : "false", - "native_address" : "unregistered" - } -, - { - "is_managed" : "false", - "native_address" : "0x1079ccdad", - "native_offset" : "0x00000" - } -, - { - "is_managed" : "false", - "native_address" : "0x7fff6ef4bd76", - "native_offset" : "0x00000" - } -, - { - "is_managed" : "false", - "native_address" : "0x7fff6ef485d7", - "native_offset" : "0x00000" - } - - ] - }, - { - "is_managed" : true, - "offset_free_hash" : "0x11a6748113", - "offset_rich_hash" : "0x11a6748628", - "crashed" : true, - "native_thread_id" : "0x700008b48000", - "thread_info_addr" : "0x7fc726c9fe00", - "thread_name" : "tid_9b03", - "ctx" : { - "IP" : "0x7fff6ee8e47a", - "SP" : "0x700008b40b98", - "BP" : "0x700008b40bc0" - }, - "managed_frames" : [ - { - "is_managed" : "false", - "native_address" : "unregistered" - } -, - { - "is_managed" : "true", - "guid" : "58E88B8B-C383-4E07-A237-B0C4D21C89F2", - "token" : "0x00000", - "native_offset" : "0x0", - "filename" : "mscorlib.dll", - "sizeofimage" : "0x472000", - "timestamp" : "0x86058d7a", - "il_offset" : "0x00000" - } -, - { - "is_managed" : "true", - "guid" : "58E88B8B-C383-4E07-A237-B0C4D21C89F2", - "token" : "0x00000", - "native_offset" : "0x0", - "filename" : "mscorlib.dll", - "sizeofimage" : "0x472000", - "timestamp" : "0x86058d7a", - "il_offset" : "0x00015" - } -, - { - "is_managed" : "true", - "guid" : "181E198A-F674-4D54-82D2-31B9E17B3F9B", - "token" : "0x6004287", - "native_offset" : "0x0", - "filename" : "System.dll", - "sizeofimage" : "0x292000", - "timestamp" : "0xc58d4017", - "il_offset" : "0x0026a" - } -, - { - "is_managed" : "true", - "guid" : "181E198A-F674-4D54-82D2-31B9E17B3F9B", - "token" : "0x6004487", - "native_offset" : "0x0", - "filename" : "System.dll", - "sizeofimage" : "0x292000", - "timestamp" : "0xc58d4017", - "il_offset" : "0x00043" - } -, - { - "is_managed" : "true", - "guid" : "58E88B8B-C383-4E07-A237-B0C4D21C89F2", - "token" : "0x6001ed4", - "native_offset" : "0x0", - "filename" : "mscorlib.dll", - "sizeofimage" : "0x472000", - "timestamp" : "0x86058d7a", - "il_offset" : "0x00071" - } -, - { - "is_managed" : "true", - "guid" : "58E88B8B-C383-4E07-A237-B0C4D21C89F2", - "token" : "0x6001ed2", - "native_offset" : "0x0", - "filename" : "mscorlib.dll", - "sizeofimage" : "0x472000", - "timestamp" : "0x86058d7a", - "il_offset" : "0x00000" - } -, - { - "is_managed" : "true", - "guid" : "58E88B8B-C383-4E07-A237-B0C4D21C89F2", - "token" : "0x6001ed1", - "native_offset" : "0x0", - "filename" : "mscorlib.dll", - "sizeofimage" : "0x472000", - "timestamp" : "0x86058d7a", - "il_offset" : "0x0002b" - } -, - { - "is_managed" : "true", - "guid" : "181E198A-F674-4D54-82D2-31B9E17B3F9B", - "token" : "0x6004286", - "native_offset" : "0x0", - "filename" : "System.dll", - "sizeofimage" : "0x292000", - "timestamp" : "0xc58d4017", - "il_offset" : "0x00093" - } -, - { - "is_managed" : "true", - "guid" : "181E198A-F674-4D54-82D2-31B9E17B3F9B", - "token" : "0x00000", - "native_offset" : "0x0", - "filename" : "System.dll", - "sizeofimage" : "0x292000", - "timestamp" : "0xc58d4017", - "il_offset" : "0xffffffff" - } -, - { - "is_managed" : "false", - "native_address" : "unregistered" - } -, - { - "is_managed" : "true", - "guid" : "181E198A-F674-4D54-82D2-31B9E17B3F9B", - "token" : "0x00000", - "native_offset" : "0x0", - "filename" : "System.dll", - "sizeofimage" : "0x292000", - "timestamp" : "0xc58d4017", - "il_offset" : "0x00000" - } -, - { - "is_managed" : "true", - "guid" : "181E198A-F674-4D54-82D2-31B9E17B3F9B", - "token" : "0x600447f", - "native_offset" : "0x0", - "filename" : "System.dll", - "sizeofimage" : "0x292000", - "timestamp" : "0xc58d4017", - "il_offset" : "0x0003a" - } -, - { - "is_managed" : "true", - "guid" : "58E88B8B-C383-4E07-A237-B0C4D21C89F2", - "token" : "0x6001f2c", - "native_offset" : "0x0", - "filename" : "mscorlib.dll", - "sizeofimage" : "0x472000", - "timestamp" : "0x86058d7a", - "il_offset" : "0x00025" - } -, - { - "is_managed" : "true", - "guid" : "58E88B8B-C383-4E07-A237-B0C4D21C89F2", - "token" : "0x6001ed4", - "native_offset" : "0x0", - "filename" : "mscorlib.dll", - "sizeofimage" : "0x472000", - "timestamp" : "0x86058d7a", - "il_offset" : "0x00071" - } -, - { - "is_managed" : "true", - "guid" : "58E88B8B-C383-4E07-A237-B0C4D21C89F2", - "token" : "0x6001ed2", - "native_offset" : "0x0", - "filename" : "mscorlib.dll", - "sizeofimage" : "0x472000", - "timestamp" : "0x86058d7a", - "il_offset" : "0x00000" - } -, - { - "is_managed" : "true", - "guid" : "58E88B8B-C383-4E07-A237-B0C4D21C89F2", - "token" : "0x6001ed1", - "native_offset" : "0x0", - "filename" : "mscorlib.dll", - "sizeofimage" : "0x472000", - "timestamp" : "0x86058d7a", - "il_offset" : "0x0002b" - } -, - { - "is_managed" : "true", - "guid" : "58E88B8B-C383-4E07-A237-B0C4D21C89F2", - "token" : "0x00000", - "native_offset" : "0x0", - "filename" : "mscorlib.dll", - "sizeofimage" : "0x472000", - "timestamp" : "0x86058d7a", - "il_offset" : "0x0002a" - } - - ], - "unmanaged_frames" : [ - { - "is_managed" : "false", - "native_address" : "0x107830a06", - "native_offset" : "0x00000" - } -, - { - "is_managed" : "false", - "native_address" : "0x1079cc165", - "native_offset" : "0x00000" - } -, - { - "is_managed" : "false", - "native_address" : "0x1079cc7da", - "native_offset" : "0x00000" - } -, - { - "is_managed" : "false", - "native_address" : "0x10789da97", - "native_offset" : "0x00000" - } -, - { - "is_managed" : "false", - "native_address" : "0x10783569e", - "native_offset" : "0x00000" - } -, - { - "is_managed" : "false", - "native_address" : "0x10789cd8f", - "native_offset" : "0x00000" - } -, - { - "is_managed" : "false", - "native_address" : "0x7fff6ef40b1d", - "native_offset" : "0x00000" - } -, - { - "is_managed" : "false", - "native_address" : "unregistered" - } -, - { - "is_managed" : "false", - "native_address" : "0x7fff6ee16a08", - "native_offset" : "0x00000" - } -, - { - "is_managed" : "false", - "native_address" : "0x107a89f67", - "native_offset" : "0x00000" - } -, - { - "is_managed" : "false", - "native_address" : "0x107a6adef", - "native_offset" : "0x00000" - } -, - { - "is_managed" : "false", - "native_address" : "0x107a8a3fe", - "native_offset" : "0x00000" - } -, - { - "is_managed" : "false", - "native_address" : "0x107a8a57f", - "native_offset" : "0x00000" - } -, - { - "is_managed" : "false", - "native_address" : "0x107a8a5ba", - "native_offset" : "0x00000" - } -, - { - "is_managed" : "false", - "native_address" : "0x107a7ed8e", - "native_offset" : "0x00000" - } -, - { - "is_managed" : "false", - "native_address" : "0x1079c66d1", - "native_offset" : "0x00000" - } -, - { - "is_managed" : "false", - "native_address" : "0x1079cd34d", - "native_offset" : "0x00000" - } -, - { - "is_managed" : "false", - "native_address" : "0x1079cafbc", - "native_offset" : "0x00000" - } -, - { - "is_managed" : "true", - "guid" : "58E88B8B-C383-4E07-A237-B0C4D21C89F2", - "token" : "0x00000", - "native_offset" : "0x0", - "filename" : "mscorlib.dll", - "sizeofimage" : "0x472000", - "timestamp" : "0x86058d7a", - "il_offset" : "0x00000" - } -, - { - "is_managed" : "true", - "guid" : "58E88B8B-C383-4E07-A237-B0C4D21C89F2", - "token" : "0x00000", - "native_offset" : "0x0", - "filename" : "mscorlib.dll", - "sizeofimage" : "0x472000", - "timestamp" : "0x86058d7a", - "il_offset" : "0x00000" - } -, - { - "is_managed" : "true", - "guid" : "181E198A-F674-4D54-82D2-31B9E17B3F9B", - "token" : "0x6004287", - "native_offset" : "0x0", - "filename" : "System.dll", - "sizeofimage" : "0x292000", - "timestamp" : "0xc58d4017", - "il_offset" : "0x00000" - } -, - { - "is_managed" : "true", - "guid" : "181E198A-F674-4D54-82D2-31B9E17B3F9B", - "token" : "0x6004487", - "native_offset" : "0x0", - "filename" : "System.dll", - "sizeofimage" : "0x292000", - "timestamp" : "0xc58d4017", - "il_offset" : "0x00000" - } -, - { - "is_managed" : "true", - "guid" : "58E88B8B-C383-4E07-A237-B0C4D21C89F2", - "token" : "0x6001ed4", - "native_offset" : "0x0", - "filename" : "mscorlib.dll", - "sizeofimage" : "0x472000", - "timestamp" : "0x86058d7a", - "il_offset" : "0x00000" - } -, - { - "is_managed" : "true", - "guid" : "58E88B8B-C383-4E07-A237-B0C4D21C89F2", - "token" : "0x6001ed2", - "native_offset" : "0x0", - "filename" : "mscorlib.dll", - "sizeofimage" : "0x472000", - "timestamp" : "0x86058d7a", - "il_offset" : "0x00000" - } -, - { - "is_managed" : "true", - "guid" : "181E198A-F674-4D54-82D2-31B9E17B3F9B", - "token" : "0x00000", - "native_offset" : "0x0", - "filename" : "System.dll", - "sizeofimage" : "0x292000", - "timestamp" : "0xc58d4017", - "il_offset" : "0x00000" - } -, - { - "is_managed" : "false", - "native_address" : "0x7fff3933f8eb", - "native_offset" : "0x00000" - } -, - { - "is_managed" : "false", - "native_address" : "0x7fff3933ed13", - "native_offset" : "0x00000" - } -, - { - "is_managed" : "false", - "native_address" : "0x7fff3933ec0d", - "native_offset" : "0x00000" - } -, - { - "is_managed" : "false", - "native_address" : "0x7fff3934168d", - "native_offset" : "0x00000" - } -, - { - "is_managed" : "false", - "native_address" : "0x7fff37bf1d23", - "native_offset" : "0x00000" - } -, - { - "is_managed" : "false", - "native_address" : "0x7fff37bf1c1f", - "native_offset" : "0x00000" - } -, - { - "is_managed" : "false", - "native_address" : "0x7fff37bf1b6f", - "native_offset" : "0x00000" - } -, - { - "is_managed" : "false", - "native_address" : "0x7fff37bd9acc", - "native_offset" : "0x00000" - } -, - { - "is_managed" : "false", - "native_address" : "0x7fff37bd8e13", - "native_offset" : "0x00000" - } -, - { - "is_managed" : "false", - "native_address" : "0x7fff37bd8bea", - "native_offset" : "0x00000" - } -, - { - "is_managed" : "false", - "native_address" : "0x10e7edfcc", - "native_offset" : "0x00000" - } -, - { - "is_managed" : "true", - "guid" : "181E198A-F674-4D54-82D2-31B9E17B3F9B", - "token" : "0x00000", - "native_offset" : "0x0", - "filename" : "System.dll", - "sizeofimage" : "0x292000", - "timestamp" : "0xc58d4017", - "il_offset" : "0x00000" - } -, - { - "is_managed" : "true", - "guid" : "181E198A-F674-4D54-82D2-31B9E17B3F9B", - "token" : "0x600447f", - "native_offset" : "0x0", - "filename" : "System.dll", - "sizeofimage" : "0x292000", - "timestamp" : "0xc58d4017", - "il_offset" : "0x00000" - } -, - { - "is_managed" : "true", - "guid" : "58E88B8B-C383-4E07-A237-B0C4D21C89F2", - "token" : "0x6001f2c", - "native_offset" : "0x0", - "filename" : "mscorlib.dll", - "sizeofimage" : "0x472000", - "timestamp" : "0x86058d7a", - "il_offset" : "0x00000" - } -, - { - "is_managed" : "true", - "guid" : "58E88B8B-C383-4E07-A237-B0C4D21C89F2", - "token" : "0x6001ed2", - "native_offset" : "0x0", - "filename" : "mscorlib.dll", - "sizeofimage" : "0x472000", - "timestamp" : "0x86058d7a", - "il_offset" : "0x00000" - } -, - { - "is_managed" : "false", - "native_address" : "0x1077929d2", - "native_offset" : "0x00000" - } -, - { - "is_managed" : "false", - "native_address" : "0x1079a1f97", - "native_offset" : "0x00000" - } -, - { - "is_managed" : "false", - "native_address" : "0x1079a8840", - "native_offset" : "0x00000" - } -, - { - "is_managed" : "false", - "native_address" : "0x1079ccded", - "native_offset" : "0x00000" - } -, - { - "is_managed" : "false", - "native_address" : "0x7fff6ef4bd76", - "native_offset" : "0x00000" - } -, - { - "is_managed" : "false", - "native_address" : "0x7fff6ef485d7", - "native_offset" : "0x00000" - } - - ] -} -] -} \ No newline at end of file From 26cd957a56b6349f9e37bf76bd37f92a7a7e0970 Mon Sep 17 00:00:00 2001 From: dionjansen Date: Sun, 3 Jan 2021 18:12:42 +0100 Subject: [PATCH 14/28] Added test suite for unacked message tracker --- .../Abstractions/IUnackedMessageTracker.cs | 3 +- .../Internal/InactiveUnackedMessageTracker.cs | 3 +- .../Internal/ReaderChannelFactory.cs | 2 +- .../Internal/UnackedMessageTracker.cs | 26 ++- tests/DotPulsar.Tests/DotPulsar.Tests.csproj | 2 + .../Internal/MessageAcksTrackerTests.cs | 164 +++++++++++++++++- 6 files changed, 178 insertions(+), 22 deletions(-) diff --git a/src/DotPulsar/Internal/Abstractions/IUnackedMessageTracker.cs b/src/DotPulsar/Internal/Abstractions/IUnackedMessageTracker.cs index ca2b703c5..ea432d1d4 100644 --- a/src/DotPulsar/Internal/Abstractions/IUnackedMessageTracker.cs +++ b/src/DotPulsar/Internal/Abstractions/IUnackedMessageTracker.cs @@ -2,6 +2,7 @@ { using DotPulsar.Abstractions; using System.Threading.Tasks; + using System.Threading; using System; public interface IUnackedMessageTracker : IDisposable @@ -10,6 +11,6 @@ public interface IUnackedMessageTracker : IDisposable void Ack(MessageId messageId); - Task Start(IConsumer consumer); + Task Start(IConsumer consumer, CancellationToken cancellationToken = default); } } diff --git a/src/DotPulsar/Internal/InactiveUnackedMessageTracker.cs b/src/DotPulsar/Internal/InactiveUnackedMessageTracker.cs index 07fd69f1b..9155ab5f6 100644 --- a/src/DotPulsar/Internal/InactiveUnackedMessageTracker.cs +++ b/src/DotPulsar/Internal/InactiveUnackedMessageTracker.cs @@ -1,5 +1,6 @@ namespace DotPulsar.Internal { + using System.Threading; using System.Threading.Tasks; using Abstractions; using DotPulsar.Abstractions; @@ -20,7 +21,7 @@ public void Add(MessageId messageId) return; } - public Task Start(IConsumer consumer) => Task.CompletedTask; + public Task Start(IConsumer consumer, CancellationToken cancellationToken = default) => Task.CompletedTask; public void Dispose() { return; diff --git a/src/DotPulsar/Internal/ReaderChannelFactory.cs b/src/DotPulsar/Internal/ReaderChannelFactory.cs index dcb5ea75a..718084aa4 100644 --- a/src/DotPulsar/Internal/ReaderChannelFactory.cs +++ b/src/DotPulsar/Internal/ReaderChannelFactory.cs @@ -63,7 +63,7 @@ private async ValueTask GetChannel(CancellationToken cancellatio { var connection = await _connectionPool.FindConnectionForTopic(_subscribe.Topic, cancellationToken).ConfigureAwait(false); var messageQueue = new AsyncQueue(); - var tracker = new InactiveMessageAcksTracker(); // No tracker for reader since readers don't ack. + var tracker = new InactiveUnackedMessageTracker(); // No tracker for reader since readers don't ack. var consumerMessageQueue = new MessageQueue(messageQueue, tracker); var channel = new Channel(_correlationId, _eventRegister, messageQueue); var response = await connection.Send(_subscribe, channel, cancellationToken).ConfigureAwait(false); diff --git a/src/DotPulsar/Internal/UnackedMessageTracker.cs b/src/DotPulsar/Internal/UnackedMessageTracker.cs index 299dfa1e2..d52da34da 100644 --- a/src/DotPulsar/Internal/UnackedMessageTracker.cs +++ b/src/DotPulsar/Internal/UnackedMessageTracker.cs @@ -13,18 +13,15 @@ public readonly struct AwaitingAck { public MessageId MessageId { get; } - public long Timestamp { get; } + public Stopwatch Stopwatch { get; } public AwaitingAck(MessageId messageId) { MessageId = messageId; - Timestamp = Stopwatch.GetTimestamp(); + Stopwatch = Stopwatch.StartNew(); } - public TimeSpan Elapsed => - TimeSpan.FromTicks( - (Stopwatch.GetTimestamp() - Timestamp) / - (Stopwatch.Frequency) * 1000); + public TimeSpan Elapsed => Stopwatch.Elapsed; } public sealed class UnackedMessageTracker : IUnackedMessageTracker @@ -57,29 +54,30 @@ public void Ack(MessageId messageId) _acked.Add(messageId); } - public Task Start(IConsumer consumer) + public Task Start(IConsumer consumer, CancellationToken cancellationToken = default) { - var cancellationToken = _cancellationTokenSource.Token; + CancellationToken token = + CancellationTokenSource.CreateLinkedTokenSource( + _cancellationTokenSource.Token, cancellationToken).Token; return Task.Run(async () => { - while (!cancellationToken.IsCancellationRequested) + while (!token.IsCancellationRequested) { var messages = CheckUnackedMessages(); if (messages.Count() > 0) - await consumer.RedeliverUnacknowledgedMessages(messages, cancellationToken); + await consumer.RedeliverUnacknowledgedMessages(messages, token); - await Task.Delay(_pollingTimeout, cancellationToken); + await Task.Delay(_pollingTimeout, token); } - }, cancellationToken); + }, token); } private IEnumerable CheckUnackedMessages() { - AwaitingAck awaiting; var result = new List(); - while (_awaitingAcks.TryPeek(out awaiting) + while (_awaitingAcks.TryPeek(out AwaitingAck awaiting) && awaiting.Elapsed > _ackTimeout) { // Can I safely use Dequeue now instead of TryDequeue? diff --git a/tests/DotPulsar.Tests/DotPulsar.Tests.csproj b/tests/DotPulsar.Tests/DotPulsar.Tests.csproj index 74cb6b9fc..accfbbfbf 100644 --- a/tests/DotPulsar.Tests/DotPulsar.Tests.csproj +++ b/tests/DotPulsar.Tests/DotPulsar.Tests.csproj @@ -17,6 +17,8 @@ all runtime; build; native; contentfiles; analyzers; buildtransitive + + diff --git a/tests/DotPulsar.Tests/Internal/MessageAcksTrackerTests.cs b/tests/DotPulsar.Tests/Internal/MessageAcksTrackerTests.cs index 8a06a76db..f915f1630 100644 --- a/tests/DotPulsar.Tests/Internal/MessageAcksTrackerTests.cs +++ b/tests/DotPulsar.Tests/Internal/MessageAcksTrackerTests.cs @@ -1,18 +1,172 @@ namespace DotPulsar.Tests.Internal { using DotPulsar.Internal; + using DotPulsar.Abstractions; using FluentAssertions; - using System.Buffers; - using System.Linq; using Xunit; + using System; + using AutoFixture; + using System.Threading; + using System.Threading.Tasks; + using System.Collections.Generic; + using System.Linq; + using System.Linq.Expressions; + using AutoFixture.AutoNSubstitute; + using NSubstitute; + using System.Diagnostics; - public class MessageAcksTrackerTests + public class UnackedMessageTrackerTests { [Fact] public void Test_Instance() { - var tracker = new MessageAcksTracker(1, 2, 3); - tracker.Should().BeOfType(); + var tracker = new UnackedMessageTracker(TimeSpan.FromSeconds(10), TimeSpan.FromSeconds(1)); + tracker.Should().BeOfType(); + } + + + [Fact] + public async void Test_AwaitingAck_Elapsed() + { + //Arrange + var messageId = MessageId.Latest; + var sw = new Stopwatch(); + sw.Start(); + + //Act + var awaiting = new AwaitingAck(messageId); + await Task.Delay(TimeSpan.FromMilliseconds(123)); + sw.Stop(); + + //Assert + awaiting.Elapsed.Should().BeCloseTo(sw.Elapsed, 1); + } + + [Fact] + public async void Test_Start_Message() + { + //Arrange + var fixture = new Fixture(); + fixture.Customize(new AutoNSubstituteCustomization()); + var consumer = Substitute.For(); + var messageId = MessageId.Latest; + var cts = new CancellationTokenSource(); + + + var tracker = new UnackedMessageTracker( + TimeSpan.FromMilliseconds(10), + TimeSpan.FromMilliseconds(1)); + + //Act + tracker.Add(messageId); + cts.CancelAfter(20); + try { await tracker.Start(consumer, cts.Token); } + catch (TaskCanceledException) { } + + //Assert + await consumer + .Received(1) + .RedeliverUnacknowledgedMessages( + Arg.Is(EquivalentTo(new List() { messageId })), + Arg.Any()); + } + + [Fact] + public async void Test_Start_Message_Ack_In_Time() + { + //Arrange + var fixture = new Fixture(); + fixture.Customize(new AutoNSubstituteCustomization()); + var consumer = Substitute.For(); + var messageId = MessageId.Latest; + var cts = new CancellationTokenSource(); + + + var tracker = new UnackedMessageTracker( + TimeSpan.FromMilliseconds(10), + TimeSpan.FromMilliseconds(1)); + + //Act + tracker.Add(messageId); + cts.CancelAfter(20); + var _ = Task.Delay(5).ContinueWith(_ => tracker.Ack(messageId)); + try { await tracker.Start(consumer, cts.Token); } + catch (TaskCanceledException) { } + + //Assert + await consumer + .DidNotReceive() + .RedeliverUnacknowledgedMessages( + Arg.Any>(), + Arg.Any()); + } + + [Fact] + public async void Test_Start_Message_Ack_Too_Late() + { + //Arrange + var fixture = new Fixture(); + fixture.Customize(new AutoNSubstituteCustomization()); + var consumer = Substitute.For(); + var messageId = MessageId.Latest; + var cts = new CancellationTokenSource(); + + + var tracker = new UnackedMessageTracker( + TimeSpan.FromMilliseconds(10), + TimeSpan.FromMilliseconds(1)); + + //Act + tracker.Add(messageId); + cts.CancelAfter(20); + + var _ = Task.Delay(15).ContinueWith(_ => tracker.Ack(messageId)); + try { await tracker.Start(consumer, cts.Token); } + catch (TaskCanceledException) { } + + //Assert + await consumer + .Received(1) + .RedeliverUnacknowledgedMessages( + Arg.Any>(), + Arg.Any()); } + + [Fact] + public async void Test_Start_Redeliver_Only_Cnce() + { + //Arrange + var fixture = new Fixture(); + fixture.Customize(new AutoNSubstituteCustomization()); + var consumer = Substitute.For(); + var messageId = MessageId.Latest; + var cts = new CancellationTokenSource(); + + + var tracker = new UnackedMessageTracker( + TimeSpan.FromMilliseconds(10), + TimeSpan.FromMilliseconds(5)); + + //Act + tracker.Add(messageId); + cts.CancelAfter(50); + try { await tracker.Start(consumer, cts.Token); } + catch (TaskCanceledException) { } + + //Assert + await consumer + .Received(1) + .RedeliverUnacknowledgedMessages( + Arg.Any>(), + Arg.Any()); + } + + + private Expression>> EquivalentTo(IEnumerable enumerable) => + x => IsEquivalentIEnumerable(enumerable, x); + + + private bool IsEquivalentIEnumerable(IEnumerable a, IEnumerable b) => + a.Count() == b.Count() && a.Zip(b, (a_, b_) => a_.Equals(b_)).All(_ => _); } } \ No newline at end of file From f4725f5d81e8715a55ab3e4ea6791f903e9e9ad4 Mon Sep 17 00:00:00 2001 From: dionjansen Date: Sun, 3 Jan 2021 19:05:29 +0100 Subject: [PATCH 15/28] Refactored elapsed calc --- src/DotPulsar/Internal/UnackedMessageTracker.cs | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/src/DotPulsar/Internal/UnackedMessageTracker.cs b/src/DotPulsar/Internal/UnackedMessageTracker.cs index d52da34da..2b5ef030f 100644 --- a/src/DotPulsar/Internal/UnackedMessageTracker.cs +++ b/src/DotPulsar/Internal/UnackedMessageTracker.cs @@ -13,15 +13,16 @@ public readonly struct AwaitingAck { public MessageId MessageId { get; } - public Stopwatch Stopwatch { get; } + public long Timestamp { get; } public AwaitingAck(MessageId messageId) { MessageId = messageId; - Stopwatch = Stopwatch.StartNew(); + Timestamp = Stopwatch.GetTimestamp(); } - public TimeSpan Elapsed => Stopwatch.Elapsed; + public TimeSpan Elapsed => TimeSpan.FromTicks( + (long) ((Stopwatch.GetTimestamp() - Timestamp) / (double)Stopwatch.Frequency * TimeSpan.TicksPerSecond)); } public sealed class UnackedMessageTracker : IUnackedMessageTracker From 51f4a984a5819aeb91ebcd25345b495c87cc02a7 Mon Sep 17 00:00:00 2001 From: dionjansen Date: Sun, 3 Jan 2021 19:14:21 +0100 Subject: [PATCH 16/28] cleanup --- src/DotPulsar/Internal/UnackedMessageTracker.cs | 2 -- 1 file changed, 2 deletions(-) diff --git a/src/DotPulsar/Internal/UnackedMessageTracker.cs b/src/DotPulsar/Internal/UnackedMessageTracker.cs index 2b5ef030f..48b4f9b08 100644 --- a/src/DotPulsar/Internal/UnackedMessageTracker.cs +++ b/src/DotPulsar/Internal/UnackedMessageTracker.cs @@ -81,10 +81,8 @@ private IEnumerable CheckUnackedMessages() while (_awaitingAcks.TryPeek(out AwaitingAck awaiting) && awaiting.Elapsed > _ackTimeout) { - // Can I safely use Dequeue now instead of TryDequeue? if (_awaitingAcks.TryDequeue(out awaiting)) { - //If the MessageId is not acknowledged if (!_acked.Contains(awaiting.MessageId)) result.Add(awaiting.MessageId); else From 0119ca95e00be46184f48ab847d5353cf7e6a74f Mon Sep 17 00:00:00 2001 From: dionjansen Date: Wed, 6 Jan 2021 12:42:36 +0100 Subject: [PATCH 17/28] Cleanup old tracker --- .../Abstractions/IMessageAcksTracker.cs | 13 --- .../Internal/InactiveMessageAcksTracker.cs | 30 ----- src/DotPulsar/Internal/MessageAcksTracker.cs | 103 ------------------ 3 files changed, 146 deletions(-) delete mode 100644 src/DotPulsar/Internal/Abstractions/IMessageAcksTracker.cs delete mode 100644 src/DotPulsar/Internal/InactiveMessageAcksTracker.cs delete mode 100644 src/DotPulsar/Internal/MessageAcksTracker.cs diff --git a/src/DotPulsar/Internal/Abstractions/IMessageAcksTracker.cs b/src/DotPulsar/Internal/Abstractions/IMessageAcksTracker.cs deleted file mode 100644 index b5de8a204..000000000 --- a/src/DotPulsar/Internal/Abstractions/IMessageAcksTracker.cs +++ /dev/null @@ -1,13 +0,0 @@ -namespace DotPulsar.Internal.Abstractions -{ - using DotPulsar.Abstractions; - using System.Threading; - using System.Threading.Tasks; - public interface IMessageAcksTracker - { - T Add(T message); - T Ack(T message); - T Nack(T message); - Task StartTracker(IConsumer consumer, CancellationToken cancellationToken); - } -} \ No newline at end of file diff --git a/src/DotPulsar/Internal/InactiveMessageAcksTracker.cs b/src/DotPulsar/Internal/InactiveMessageAcksTracker.cs deleted file mode 100644 index 8edb7808c..000000000 --- a/src/DotPulsar/Internal/InactiveMessageAcksTracker.cs +++ /dev/null @@ -1,30 +0,0 @@ -namespace DotPulsar.Internal -{ - using Abstractions; - using DotPulsar.Abstractions; - using DotPulsar.Exceptions; - using Events; - using Microsoft.Extensions.ObjectPool; - using PulsarApi; - using System; - using System.Collections.Generic; - using System.Linq; - using System.Runtime.CompilerServices; - using System.Threading; - using System.Threading.Tasks; - using System.Diagnostics; - - public sealed class InactiveMessageAcksTracker : IMessageAcksTracker - { - public InactiveMessageAcksTracker() { } - - public async Task StartTracker(IConsumer consumer, CancellationToken cancellationToken) - { - await Task.Yield(); - } - - public MessageId Add(MessageId message) => message; - public MessageId Ack(MessageId message) => message; - public MessageId Nack(MessageId message) => message; - } -} \ No newline at end of file diff --git a/src/DotPulsar/Internal/MessageAcksTracker.cs b/src/DotPulsar/Internal/MessageAcksTracker.cs deleted file mode 100644 index 95c65fc0f..000000000 --- a/src/DotPulsar/Internal/MessageAcksTracker.cs +++ /dev/null @@ -1,103 +0,0 @@ -namespace DotPulsar.Internal -{ - using Abstractions; - using DotPulsar.Abstractions; - using DotPulsar.Exceptions; - using Events; - using Microsoft.Extensions.ObjectPool; - using PulsarApi; - using System; - using System.Collections.Generic; - using System.Linq; - using System.Runtime.CompilerServices; - using System.Threading; - using System.Threading.Tasks; - using System.Diagnostics; - - internal class Tracker - { - private readonly Stopwatch _timer; - private int maxTimeoutMs; - - public Tracker(int timeoutMs) - { - maxTimeoutMs = timeoutMs; - _timer = new Stopwatch(); - _timer.Start(); - } - - public bool IsTimedOut() => _timer.ElapsedMilliseconds > maxTimeoutMs; - - public long msTillTimeout => maxTimeoutMs - _timer.ElapsedMilliseconds; - - public void Reset(int newTimeoutMs) - { - maxTimeoutMs = newTimeoutMs; - _timer.Restart(); - } - } - - // TODO add mechnism to stop tracker when disposed - public sealed class MessageAcksTracker : IMessageAcksTracker - { - private readonly Dictionary _trackers; - private readonly int _unackedTimeoutMs; - private readonly int _nackTimeoutMs; - private readonly int _trackerDelayMs; - public MessageAcksTracker(int unackedTimeoutMs, int nackTimeoutMs, int trackerDelayMs) - { - _unackedTimeoutMs = unackedTimeoutMs; - _nackTimeoutMs = nackTimeoutMs; - _trackerDelayMs = trackerDelayMs; - _trackers = new Dictionary(); - } - - public async Task StartTracker(IConsumer consumer, CancellationToken cancellationToken) - { - await Task.Yield(); - - while (true) - { - await Task.Delay(_trackerDelayMs); - - var messageIds = new List(); - foreach (KeyValuePair p in _trackers) - { - if (p.Value.IsTimedOut()) - messageIds.Add(p.Key); - } - - if (messageIds.Count() > 0) - await consumer.RedeliverUnacknowledgedMessages(messageIds, cancellationToken).ConfigureAwait(false); - - } - } - public MessageId Add(MessageId message) - { - if (!_trackers.ContainsKey(message)) - { - _trackers.Add(message, new Tracker(_unackedTimeoutMs)); - } - - return message; - } - public MessageId Ack(MessageId message) - { - if (_trackers.ContainsKey(message)) - _trackers.Remove(message); - return message; - } - public MessageId Nack(MessageId message) - { - if (_trackers.ContainsKey(message)) - { - var timer = _trackers[message]; - if (timer.msTillTimeout > _nackTimeoutMs) - timer.Reset(_nackTimeoutMs); - } - else - _trackers.Add(message, new Tracker(_nackTimeoutMs)); - return message; - } - } -} \ No newline at end of file From 7ad89a20a97bfc9ff1770ee1ae2abdda952870f6 Mon Sep 17 00:00:00 2001 From: dionjansen Date: Wed, 6 Jan 2021 12:47:12 +0100 Subject: [PATCH 18/28] Apache header for new files --- .../Internal/Abstractions/IMessageQueue.cs | 14 ++++++++++++++ .../Abstractions/IUnackedMessageTracker.cs | 16 +++++++++++++++- .../Internal/InactiveUnackedMessageTracker.cs | 16 +++++++++++++++- src/DotPulsar/Internal/MessageQueue.cs | 14 ++++++++++++++ src/DotPulsar/Internal/UnackedMessageTracker.cs | 16 +++++++++++++++- .../Internal/MessageAcksTrackerTests.cs | 14 ++++++++++++++ 6 files changed, 87 insertions(+), 3 deletions(-) diff --git a/src/DotPulsar/Internal/Abstractions/IMessageQueue.cs b/src/DotPulsar/Internal/Abstractions/IMessageQueue.cs index 0b9f2fdd5..79e6d61a3 100644 --- a/src/DotPulsar/Internal/Abstractions/IMessageQueue.cs +++ b/src/DotPulsar/Internal/Abstractions/IMessageQueue.cs @@ -1,3 +1,17 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + namespace DotPulsar.Internal.Abstractions { using System; diff --git a/src/DotPulsar/Internal/Abstractions/IUnackedMessageTracker.cs b/src/DotPulsar/Internal/Abstractions/IUnackedMessageTracker.cs index ea432d1d4..5896a67a9 100644 --- a/src/DotPulsar/Internal/Abstractions/IUnackedMessageTracker.cs +++ b/src/DotPulsar/Internal/Abstractions/IUnackedMessageTracker.cs @@ -1,4 +1,18 @@ -namespace DotPulsar.Internal.Abstractions +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +namespace DotPulsar.Internal.Abstractions { using DotPulsar.Abstractions; using System.Threading.Tasks; diff --git a/src/DotPulsar/Internal/InactiveUnackedMessageTracker.cs b/src/DotPulsar/Internal/InactiveUnackedMessageTracker.cs index 9155ab5f6..cecfe5c65 100644 --- a/src/DotPulsar/Internal/InactiveUnackedMessageTracker.cs +++ b/src/DotPulsar/Internal/InactiveUnackedMessageTracker.cs @@ -1,4 +1,18 @@ -namespace DotPulsar.Internal +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +namespace DotPulsar.Internal { using System.Threading; using System.Threading.Tasks; diff --git a/src/DotPulsar/Internal/MessageQueue.cs b/src/DotPulsar/Internal/MessageQueue.cs index 565a70480..68dd81c3b 100644 --- a/src/DotPulsar/Internal/MessageQueue.cs +++ b/src/DotPulsar/Internal/MessageQueue.cs @@ -1,3 +1,17 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + namespace DotPulsar.Internal { using Abstractions; diff --git a/src/DotPulsar/Internal/UnackedMessageTracker.cs b/src/DotPulsar/Internal/UnackedMessageTracker.cs index 48b4f9b08..c31341a21 100644 --- a/src/DotPulsar/Internal/UnackedMessageTracker.cs +++ b/src/DotPulsar/Internal/UnackedMessageTracker.cs @@ -1,4 +1,18 @@ -namespace DotPulsar.Internal +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +namespace DotPulsar.Internal { using Abstractions; using DotPulsar.Abstractions; diff --git a/tests/DotPulsar.Tests/Internal/MessageAcksTrackerTests.cs b/tests/DotPulsar.Tests/Internal/MessageAcksTrackerTests.cs index f915f1630..d01f12b00 100644 --- a/tests/DotPulsar.Tests/Internal/MessageAcksTrackerTests.cs +++ b/tests/DotPulsar.Tests/Internal/MessageAcksTrackerTests.cs @@ -1,3 +1,17 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + namespace DotPulsar.Tests.Internal { using DotPulsar.Internal; From a7d5f02704d2ab598377009bf1115794c1a6db2b Mon Sep 17 00:00:00 2001 From: dionjansen Date: Wed, 6 Jan 2021 13:23:40 +0100 Subject: [PATCH 19/28] Cleanup inactive message tracker --- .../Internal/InactiveUnackedMessageTracker.cs | 11 ++++------- 1 file changed, 4 insertions(+), 7 deletions(-) diff --git a/src/DotPulsar/Internal/InactiveUnackedMessageTracker.cs b/src/DotPulsar/Internal/InactiveUnackedMessageTracker.cs index cecfe5c65..f9aae9652 100644 --- a/src/DotPulsar/Internal/InactiveUnackedMessageTracker.cs +++ b/src/DotPulsar/Internal/InactiveUnackedMessageTracker.cs @@ -14,16 +14,13 @@ namespace DotPulsar.Internal { - using System.Threading; - using System.Threading.Tasks; using Abstractions; using DotPulsar.Abstractions; + using System.Threading; + using System.Threading.Tasks; public class InactiveUnackedMessageTracker : IUnackedMessageTracker { - public InactiveUnackedMessageTracker() - { - } public void Ack(MessageId messageId) { @@ -37,9 +34,9 @@ public void Add(MessageId messageId) public Task Start(IConsumer consumer, CancellationToken cancellationToken = default) => Task.CompletedTask; - public void Dispose() { + public void Dispose() + { return; } - } } From 98850bb8a5c0e6ade58714cf6c8da00527d258db Mon Sep 17 00:00:00 2001 From: dionjansen Date: Wed, 6 Jan 2021 13:23:48 +0100 Subject: [PATCH 20/28] Cleanup stress tests deps --- tests/DotPulsar.StressTests/DotPulsar.StressTests.csproj | 1 - 1 file changed, 1 deletion(-) diff --git a/tests/DotPulsar.StressTests/DotPulsar.StressTests.csproj b/tests/DotPulsar.StressTests/DotPulsar.StressTests.csproj index 692758095..bdf1d1a1a 100644 --- a/tests/DotPulsar.StressTests/DotPulsar.StressTests.csproj +++ b/tests/DotPulsar.StressTests/DotPulsar.StressTests.csproj @@ -7,7 +7,6 @@ - all From 2edfa357bf9dc74292ac1977c09438c7be97e95e Mon Sep 17 00:00:00 2001 From: dionjansen Date: Wed, 6 Jan 2021 13:29:53 +0100 Subject: [PATCH 21/28] Cleanup message queue and interface --- .../Internal/Abstractions/IMessageQueue.cs | 7 ++++--- src/DotPulsar/Internal/MessageQueue.cs | 14 ++++---------- 2 files changed, 8 insertions(+), 13 deletions(-) diff --git a/src/DotPulsar/Internal/Abstractions/IMessageQueue.cs b/src/DotPulsar/Internal/Abstractions/IMessageQueue.cs index 79e6d61a3..dc3af15a8 100644 --- a/src/DotPulsar/Internal/Abstractions/IMessageQueue.cs +++ b/src/DotPulsar/Internal/Abstractions/IMessageQueue.cs @@ -18,7 +18,8 @@ namespace DotPulsar.Internal.Abstractions public interface IMessageQueue : IDequeue, IDisposable { - void Acknowledge(MessageId obj); - void NegativeAcknowledge(MessageId obj); + void Acknowledge(MessageId messageId); + + void NegativeAcknowledge(MessageId messageId); } -} \ No newline at end of file +} diff --git a/src/DotPulsar/Internal/MessageQueue.cs b/src/DotPulsar/Internal/MessageQueue.cs index 68dd81c3b..694ec2749 100644 --- a/src/DotPulsar/Internal/MessageQueue.cs +++ b/src/DotPulsar/Internal/MessageQueue.cs @@ -15,15 +15,7 @@ namespace DotPulsar.Internal { using Abstractions; - using DotPulsar.Abstractions; - using DotPulsar.Exceptions; - using Events; - using Microsoft.Extensions.ObjectPool; - using PulsarApi; using System; - using System.Collections.Generic; - using System.Linq; - using System.Runtime.CompilerServices; using System.Threading; using System.Threading.Tasks; @@ -31,17 +23,20 @@ public sealed class MessageQueue : IMessageQueue, IDequeue, IDis { private readonly AsyncQueue _queue; private readonly IUnackedMessageTracker _tracker; + public MessageQueue(AsyncQueue queue, IUnackedMessageTracker tracker) { _queue = queue; _tracker = tracker; } + public async ValueTask Dequeue(CancellationToken cancellationToken = default) { var message = await _queue.Dequeue(cancellationToken).ConfigureAwait(false); _tracker.Add(new MessageId(message.MessageId)); return message; } + public void Acknowledge(MessageId obj) => _tracker.Ack(obj); public void NegativeAcknowledge(MessageId obj) @@ -54,6 +49,5 @@ public void Dispose() _queue.Dispose(); _tracker.Dispose(); } - } -} \ No newline at end of file +} From b78ac549fc4d9d99452a293c1bb31ddc47ea0715 Mon Sep 17 00:00:00 2001 From: dionjansen Date: Wed, 6 Jan 2021 13:36:10 +0100 Subject: [PATCH 22/28] Formatting tracker --- src/DotPulsar/Internal/UnackedMessageTracker.cs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/DotPulsar/Internal/UnackedMessageTracker.cs b/src/DotPulsar/Internal/UnackedMessageTracker.cs index c31341a21..6d3d03fb8 100644 --- a/src/DotPulsar/Internal/UnackedMessageTracker.cs +++ b/src/DotPulsar/Internal/UnackedMessageTracker.cs @@ -36,7 +36,7 @@ public AwaitingAck(MessageId messageId) } public TimeSpan Elapsed => TimeSpan.FromTicks( - (long) ((Stopwatch.GetTimestamp() - Timestamp) / (double)Stopwatch.Frequency * TimeSpan.TicksPerSecond)); + (long) ((Stopwatch.GetTimestamp() - Timestamp) / (double) Stopwatch.Frequency * TimeSpan.TicksPerSecond)); } public sealed class UnackedMessageTracker : IUnackedMessageTracker @@ -47,7 +47,6 @@ public sealed class UnackedMessageTracker : IUnackedMessageTracker private readonly List _acked; private readonly CancellationTokenSource _cancellationTokenSource; - public UnackedMessageTracker(TimeSpan ackTimeout, TimeSpan pollingTimeout) { _ackTimeout = ackTimeout; @@ -75,7 +74,8 @@ public Task Start(IConsumer consumer, CancellationToken cancellationToken = defa CancellationTokenSource.CreateLinkedTokenSource( _cancellationTokenSource.Token, cancellationToken).Token; - return Task.Run(async () => { + return Task.Run(async () => + { while (!token.IsCancellationRequested) { var messages = CheckUnackedMessages(); From b57668b199726b2b8deda26ab1116a8565a18c27 Mon Sep 17 00:00:00 2001 From: dionjansen Date: Wed, 6 Jan 2021 19:33:57 +0100 Subject: [PATCH 23/28] Fixed tests naming, separated tests for struct --- .../Internal/AwaitingAckTests.cs | 42 +++++++++++++++ .../Internal/MessageAcksTrackerTests.cs | 52 +++++-------------- 2 files changed, 54 insertions(+), 40 deletions(-) create mode 100644 tests/DotPulsar.Tests/Internal/AwaitingAckTests.cs diff --git a/tests/DotPulsar.Tests/Internal/AwaitingAckTests.cs b/tests/DotPulsar.Tests/Internal/AwaitingAckTests.cs new file mode 100644 index 000000000..70aa71575 --- /dev/null +++ b/tests/DotPulsar.Tests/Internal/AwaitingAckTests.cs @@ -0,0 +1,42 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +namespace DotPulsar.Tests.Internal +{ + using DotPulsar.Internal; + using FluentAssertions; + using System; + using System.Diagnostics; + using System.Threading.Tasks; + using Xunit; + + public class AwaitingAckTests + { + [Fact] + public async void Elapsed_GivenTimeElapsed_ShoulEqualCorrectElapsedTicks() + { + //Arrange + var messageId = MessageId.Latest; + var sw = Stopwatch.StartNew(); + + //Act + var awaiting = new AwaitingAck(messageId); + await Task.Delay(TimeSpan.FromMilliseconds(123)); + sw.Stop(); + + //Assert + awaiting.Elapsed.Should().BeCloseTo(sw.Elapsed, 1); + } + } +} diff --git a/tests/DotPulsar.Tests/Internal/MessageAcksTrackerTests.cs b/tests/DotPulsar.Tests/Internal/MessageAcksTrackerTests.cs index d01f12b00..9261ee746 100644 --- a/tests/DotPulsar.Tests/Internal/MessageAcksTrackerTests.cs +++ b/tests/DotPulsar.Tests/Internal/MessageAcksTrackerTests.cs @@ -14,50 +14,24 @@ namespace DotPulsar.Tests.Internal { - using DotPulsar.Internal; + using AutoFixture; + using AutoFixture.AutoNSubstitute; using DotPulsar.Abstractions; - using FluentAssertions; - using Xunit; + using DotPulsar.Internal; + using NSubstitute; using System; - using AutoFixture; - using System.Threading; - using System.Threading.Tasks; using System.Collections.Generic; using System.Linq; using System.Linq.Expressions; - using AutoFixture.AutoNSubstitute; - using NSubstitute; - using System.Diagnostics; + using System.Threading; + using System.Threading.Tasks; + using Xunit; public class UnackedMessageTrackerTests { - [Fact] - public void Test_Instance() - { - var tracker = new UnackedMessageTracker(TimeSpan.FromSeconds(10), TimeSpan.FromSeconds(1)); - tracker.Should().BeOfType(); - } - [Fact] - public async void Test_AwaitingAck_Elapsed() - { - //Arrange - var messageId = MessageId.Latest; - var sw = new Stopwatch(); - sw.Start(); - - //Act - var awaiting = new AwaitingAck(messageId); - await Task.Delay(TimeSpan.FromMilliseconds(123)); - sw.Stop(); - - //Assert - awaiting.Elapsed.Should().BeCloseTo(sw.Elapsed, 1); - } - - [Fact] - public async void Test_Start_Message() + public async void Start_GivenAMessageIdIsNotAcked_ShouldRedeliver() { //Arrange var fixture = new Fixture(); @@ -86,7 +60,7 @@ await consumer } [Fact] - public async void Test_Start_Message_Ack_In_Time() + public async void Start_GivenAMessageIdIsAckedWithinTimeout_ShouldNotRedeliver() { //Arrange var fixture = new Fixture(); @@ -116,7 +90,7 @@ await consumer } [Fact] - public async void Test_Start_Message_Ack_Too_Late() + public async void Start_GivenAMessageIdIsNotAckedWithinTimeout_ShouldRedeliver() { //Arrange var fixture = new Fixture(); @@ -147,7 +121,7 @@ await consumer } [Fact] - public async void Test_Start_Redeliver_Only_Cnce() + public async void Start_GivenAMessageIdIsNotAckedWithinTimeout_ShouldRedeliverOnlyOnce() { //Arrange var fixture = new Fixture(); @@ -175,12 +149,10 @@ await consumer Arg.Any()); } - private Expression>> EquivalentTo(IEnumerable enumerable) => x => IsEquivalentIEnumerable(enumerable, x); - private bool IsEquivalentIEnumerable(IEnumerable a, IEnumerable b) => a.Count() == b.Count() && a.Zip(b, (a_, b_) => a_.Equals(b_)).All(_ => _); } -} \ No newline at end of file +} From 871dce6c529c6914ca50033e7a67d4a51769816c Mon Sep 17 00:00:00 2001 From: dionjansen Date: Wed, 6 Jan 2021 19:43:17 +0100 Subject: [PATCH 24/28] Renamed options, use timespans instad of micros --- src/DotPulsar/ConsumerOptions.cs | 5 +++-- src/DotPulsar/PulsarClient.cs | 6 +++--- 2 files changed, 6 insertions(+), 5 deletions(-) diff --git a/src/DotPulsar/ConsumerOptions.cs b/src/DotPulsar/ConsumerOptions.cs index 2c4c2bdfb..2caa1ec2b 100644 --- a/src/DotPulsar/ConsumerOptions.cs +++ b/src/DotPulsar/ConsumerOptions.cs @@ -15,6 +15,7 @@ namespace DotPulsar { using DotPulsar.Abstractions; + using System; /// /// The consumer building options. @@ -106,11 +107,11 @@ public ConsumerOptions(string subscriptionName, string topic) /// Delay to wait before redelivering messages that failed to be processed. /// When an application uses IConsumer.NegativeAcknowledge(Message), failed messages are redelivered after a fixed timeout. /// - public int NegativeAckRedeliveryDelayMicros { get; set; } + public TimeSpan NegativeAcknowledgementRedeliveryDelay { get; set; } /// /// Timeout of unacked messages /// - public int AckTimeoutMillis { get; set; } + public TimeSpan AcknowledgementTimeout { get; set; } } } diff --git a/src/DotPulsar/PulsarClient.cs b/src/DotPulsar/PulsarClient.cs index f1817ebf9..54f35aeec 100644 --- a/src/DotPulsar/PulsarClient.cs +++ b/src/DotPulsar/PulsarClient.cs @@ -78,9 +78,9 @@ public IConsumer CreateConsumer(ConsumerOptions options) var consumer = new Consumer(correlationId, options.Topic, _processManager, new NotReadyChannel(), executor, stateManager); if (options.StateChangedHandler is not null) _ = StateMonitor.MonitorConsumer(consumer, options.StateChangedHandler); - IUnackedMessageTracker unackedTracker = options.AckTimeoutMillis > 0 - ? new UnackedMessageTracker(TimeSpan.FromMilliseconds(options.AckTimeoutMillis), TimeSpan.FromSeconds(1)) - : new InactiveUnackedMessageTracker(); + IUnackedMessageTracker unackedTracker = options.AcknowledgementTimeout == default + ? new InactiveUnackedMessageTracker() + : new UnackedMessageTracker(options.AcknowledgementTimeout, TimeSpan.FromSeconds(1)); unackedTracker.Start(consumer); var factory = new ConsumerChannelFactory(correlationId, _processManager, _connectionPool, executor, options, unackedTracker); var process = new ConsumerProcess(correlationId, stateManager, factory, consumer, options.SubscriptionType == SubscriptionType.Failover); From 884193065ce156f95b7079c54c8631d05112a6ce Mon Sep 17 00:00:00 2001 From: dionjansen Date: Fri, 15 Jan 2021 17:13:40 +0100 Subject: [PATCH 25/28] Moved IEquatable and IComparable into MessageIdData partial class --- .../Internal/PulsarApi/MessageIdData.cs | 44 +++++++++++++++++++ src/DotPulsar/MessageId.cs | 22 +--------- 2 files changed, 46 insertions(+), 20 deletions(-) create mode 100644 src/DotPulsar/Internal/PulsarApi/MessageIdData.cs diff --git a/src/DotPulsar/Internal/PulsarApi/MessageIdData.cs b/src/DotPulsar/Internal/PulsarApi/MessageIdData.cs new file mode 100644 index 000000000..af25f2e37 --- /dev/null +++ b/src/DotPulsar/Internal/PulsarApi/MessageIdData.cs @@ -0,0 +1,44 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +namespace DotPulsar.Internal.PulsarApi +{ + using System; + + public partial class MessageIdData : IEquatable, IComparable + { + public int CompareTo(MessageIdData? other) + { + if (other is null) + return 1; + + var result = LedgerId.CompareTo(other.LedgerId); + if (result != 0) + return result; + + result = EntryId.CompareTo(other.EntryId); + if (result != 0) + return result; + + result = Partition.CompareTo(other.Partition); + if (result != 0) + return result; + + return BatchIndex.CompareTo(other.BatchIndex); + } + + public bool Equals(MessageIdData? other) + => other is not null && LedgerId == other.LedgerId && EntryId == other.EntryId && Partition == other.Partition && BatchIndex == other.BatchIndex; + } +} diff --git a/src/DotPulsar/MessageId.cs b/src/DotPulsar/MessageId.cs index 53bbaa9ad..100d2c5bc 100644 --- a/src/DotPulsar/MessageId.cs +++ b/src/DotPulsar/MessageId.cs @@ -72,25 +72,7 @@ public MessageId(ulong ledgerId, ulong entryId, int partition, int batchIndex) /// public int BatchIndex => Data.BatchIndex; - public int CompareTo(MessageId? other) - { - if (other is null) - return 1; - - var result = LedgerId.CompareTo(other.LedgerId); - if (result != 0) - return result; - - result = EntryId.CompareTo(other.EntryId); - if (result != 0) - return result; - - result = Partition.CompareTo(other.Partition); - if (result != 0) - return result; - - return BatchIndex.CompareTo(other.BatchIndex); - } + public int CompareTo(MessageId? other) => other is null ? 1 : Data.CompareTo(other.Data); public static bool operator >(MessageId x, MessageId y) => x is not null && x.CompareTo(y) >= 1; @@ -108,7 +90,7 @@ public override bool Equals(object? o) => o is MessageId id && Equals(id); public bool Equals(MessageId? other) - => other is not null && LedgerId == other.LedgerId && EntryId == other.EntryId && Partition == other.Partition && BatchIndex == other.BatchIndex; + => other is not null && Data.Equals(other.Data); public static bool operator ==(MessageId x, MessageId y) => ReferenceEquals(x, y) || (x is not null && x.Equals(y)); From ddfd374d70a04db55d979cb6a9b7d600aa682b1d Mon Sep 17 00:00:00 2001 From: dionjansen Date: Fri, 15 Jan 2021 17:25:51 +0100 Subject: [PATCH 26/28] Renamed ack to full acknowledge, cleaned up tests --- ...racker.cs => IUnacknowledgedMessageTracker.cs} | 4 ++-- src/DotPulsar/Internal/ConsumerChannelFactory.cs | 4 ++-- .../Internal/InactiveUnackedMessageTracker.cs | 4 ++-- src/DotPulsar/Internal/MessageQueue.cs | 8 ++++---- ...Tracker.cs => UnacknowledgedMessageTracker.cs} | 6 +++--- src/DotPulsar/PulsarClient.cs | 2 +- ...ts.cs => UnacknowledgedMessageTrackerTests.cs} | 15 +++------------ 7 files changed, 17 insertions(+), 26 deletions(-) rename src/DotPulsar/Internal/Abstractions/{IUnackedMessageTracker.cs => IUnacknowledgedMessageTracker.cs} (88%) rename src/DotPulsar/Internal/{UnackedMessageTracker.cs => UnacknowledgedMessageTracker.cs} (95%) rename tests/DotPulsar.Tests/Internal/{MessageAcksTrackerTests.cs => UnacknowledgedMessageTrackerTests.cs} (97%) diff --git a/src/DotPulsar/Internal/Abstractions/IUnackedMessageTracker.cs b/src/DotPulsar/Internal/Abstractions/IUnacknowledgedMessageTracker.cs similarity index 88% rename from src/DotPulsar/Internal/Abstractions/IUnackedMessageTracker.cs rename to src/DotPulsar/Internal/Abstractions/IUnacknowledgedMessageTracker.cs index 5896a67a9..f9b0070f3 100644 --- a/src/DotPulsar/Internal/Abstractions/IUnackedMessageTracker.cs +++ b/src/DotPulsar/Internal/Abstractions/IUnacknowledgedMessageTracker.cs @@ -19,11 +19,11 @@ namespace DotPulsar.Internal.Abstractions using System.Threading; using System; - public interface IUnackedMessageTracker : IDisposable + public interface IUnacknowledgedMessageTracker : IDisposable { void Add(MessageId messageId); - void Ack(MessageId messageId); + void Acknowledge(MessageId messageId); Task Start(IConsumer consumer, CancellationToken cancellationToken = default); } diff --git a/src/DotPulsar/Internal/ConsumerChannelFactory.cs b/src/DotPulsar/Internal/ConsumerChannelFactory.cs index 5b0c05b57..37756097f 100644 --- a/src/DotPulsar/Internal/ConsumerChannelFactory.cs +++ b/src/DotPulsar/Internal/ConsumerChannelFactory.cs @@ -31,7 +31,7 @@ public sealed class ConsumerChannelFactory : IConsumerChannelFactory private readonly int _ackTimeoutMillis; private readonly int _negativeAckRedeliveryDelayMicros; private readonly BatchHandler _batchHandler; - private readonly IUnackedMessageTracker _tracker; + private readonly IUnacknowledgedMessageTracker _tracker; public ConsumerChannelFactory( Guid correlationId, @@ -39,7 +39,7 @@ public ConsumerChannelFactory( IConnectionPool connectionPool, IExecute executor, ConsumerOptions options, - IUnackedMessageTracker tracker) + IUnacknowledgedMessageTracker tracker) { _correlationId = correlationId; _eventRegister = eventRegister; diff --git a/src/DotPulsar/Internal/InactiveUnackedMessageTracker.cs b/src/DotPulsar/Internal/InactiveUnackedMessageTracker.cs index f9aae9652..0f47b22f8 100644 --- a/src/DotPulsar/Internal/InactiveUnackedMessageTracker.cs +++ b/src/DotPulsar/Internal/InactiveUnackedMessageTracker.cs @@ -19,10 +19,10 @@ namespace DotPulsar.Internal using System.Threading; using System.Threading.Tasks; - public class InactiveUnackedMessageTracker : IUnackedMessageTracker + public sealed class InactiveUnackedMessageTracker : IUnacknowledgedMessageTracker { - public void Ack(MessageId messageId) + public void Acknowledge(MessageId messageId) { return; } diff --git a/src/DotPulsar/Internal/MessageQueue.cs b/src/DotPulsar/Internal/MessageQueue.cs index 694ec2749..e482969d0 100644 --- a/src/DotPulsar/Internal/MessageQueue.cs +++ b/src/DotPulsar/Internal/MessageQueue.cs @@ -19,12 +19,12 @@ namespace DotPulsar.Internal using System.Threading; using System.Threading.Tasks; - public sealed class MessageQueue : IMessageQueue, IDequeue, IDisposable + public sealed class MessageQueue : IMessageQueue { private readonly AsyncQueue _queue; - private readonly IUnackedMessageTracker _tracker; + private readonly IUnacknowledgedMessageTracker _tracker; - public MessageQueue(AsyncQueue queue, IUnackedMessageTracker tracker) + public MessageQueue(AsyncQueue queue, IUnacknowledgedMessageTracker tracker) { _queue = queue; _tracker = tracker; @@ -37,7 +37,7 @@ public async ValueTask Dequeue(CancellationToken cancellationTok return message; } - public void Acknowledge(MessageId obj) => _tracker.Ack(obj); + public void Acknowledge(MessageId obj) => _tracker.Acknowledge(obj); public void NegativeAcknowledge(MessageId obj) { diff --git a/src/DotPulsar/Internal/UnackedMessageTracker.cs b/src/DotPulsar/Internal/UnacknowledgedMessageTracker.cs similarity index 95% rename from src/DotPulsar/Internal/UnackedMessageTracker.cs rename to src/DotPulsar/Internal/UnacknowledgedMessageTracker.cs index 6d3d03fb8..33ec07aca 100644 --- a/src/DotPulsar/Internal/UnackedMessageTracker.cs +++ b/src/DotPulsar/Internal/UnacknowledgedMessageTracker.cs @@ -39,7 +39,7 @@ public AwaitingAck(MessageId messageId) (long) ((Stopwatch.GetTimestamp() - Timestamp) / (double) Stopwatch.Frequency * TimeSpan.TicksPerSecond)); } - public sealed class UnackedMessageTracker : IUnackedMessageTracker + public sealed class UnackedMessageTracker : IUnacknowledgedMessageTracker { private readonly TimeSpan _ackTimeout; private readonly TimeSpan _pollingTimeout; @@ -61,7 +61,7 @@ public void Add(MessageId messageId) _awaitingAcks.Enqueue(new AwaitingAck(messageId)); } - public void Ack(MessageId messageId) + public void Acknowledge(MessageId messageId) { // We only need to store the highest cumulative ack we see (if there is one) // and the MessageIds not included by that cumulative ack. @@ -109,7 +109,7 @@ private IEnumerable CheckUnackedMessages() public void Dispose() { - this._cancellationTokenSource.Cancel(); + _cancellationTokenSource.Cancel(); } } } diff --git a/src/DotPulsar/PulsarClient.cs b/src/DotPulsar/PulsarClient.cs index 54f35aeec..a820ee262 100644 --- a/src/DotPulsar/PulsarClient.cs +++ b/src/DotPulsar/PulsarClient.cs @@ -78,7 +78,7 @@ public IConsumer CreateConsumer(ConsumerOptions options) var consumer = new Consumer(correlationId, options.Topic, _processManager, new NotReadyChannel(), executor, stateManager); if (options.StateChangedHandler is not null) _ = StateMonitor.MonitorConsumer(consumer, options.StateChangedHandler); - IUnackedMessageTracker unackedTracker = options.AcknowledgementTimeout == default + IUnacknowledgedMessageTracker unackedTracker = options.AcknowledgementTimeout == default ? new InactiveUnackedMessageTracker() : new UnackedMessageTracker(options.AcknowledgementTimeout, TimeSpan.FromSeconds(1)); unackedTracker.Start(consumer); diff --git a/tests/DotPulsar.Tests/Internal/MessageAcksTrackerTests.cs b/tests/DotPulsar.Tests/Internal/UnacknowledgedMessageTrackerTests.cs similarity index 97% rename from tests/DotPulsar.Tests/Internal/MessageAcksTrackerTests.cs rename to tests/DotPulsar.Tests/Internal/UnacknowledgedMessageTrackerTests.cs index 9261ee746..8e6eefeb3 100644 --- a/tests/DotPulsar.Tests/Internal/MessageAcksTrackerTests.cs +++ b/tests/DotPulsar.Tests/Internal/UnacknowledgedMessageTrackerTests.cs @@ -27,9 +27,8 @@ namespace DotPulsar.Tests.Internal using System.Threading.Tasks; using Xunit; - public class UnackedMessageTrackerTests + public class UnacknowledgedMessageTrackerTests { - [Fact] public async void Start_GivenAMessageIdIsNotAcked_ShouldRedeliver() { @@ -39,8 +38,6 @@ public async void Start_GivenAMessageIdIsNotAcked_ShouldRedeliver() var consumer = Substitute.For(); var messageId = MessageId.Latest; var cts = new CancellationTokenSource(); - - var tracker = new UnackedMessageTracker( TimeSpan.FromMilliseconds(10), TimeSpan.FromMilliseconds(1)); @@ -68,8 +65,6 @@ public async void Start_GivenAMessageIdIsAckedWithinTimeout_ShouldNotRedeliver() var consumer = Substitute.For(); var messageId = MessageId.Latest; var cts = new CancellationTokenSource(); - - var tracker = new UnackedMessageTracker( TimeSpan.FromMilliseconds(10), TimeSpan.FromMilliseconds(1)); @@ -77,7 +72,7 @@ public async void Start_GivenAMessageIdIsAckedWithinTimeout_ShouldNotRedeliver() //Act tracker.Add(messageId); cts.CancelAfter(20); - var _ = Task.Delay(5).ContinueWith(_ => tracker.Ack(messageId)); + var _ = Task.Delay(5).ContinueWith(_ => tracker.Acknowledge(messageId)); try { await tracker.Start(consumer, cts.Token); } catch (TaskCanceledException) { } @@ -98,8 +93,6 @@ public async void Start_GivenAMessageIdIsNotAckedWithinTimeout_ShouldRedeliver() var consumer = Substitute.For(); var messageId = MessageId.Latest; var cts = new CancellationTokenSource(); - - var tracker = new UnackedMessageTracker( TimeSpan.FromMilliseconds(10), TimeSpan.FromMilliseconds(1)); @@ -108,7 +101,7 @@ public async void Start_GivenAMessageIdIsNotAckedWithinTimeout_ShouldRedeliver() tracker.Add(messageId); cts.CancelAfter(20); - var _ = Task.Delay(15).ContinueWith(_ => tracker.Ack(messageId)); + var _ = Task.Delay(15).ContinueWith(_ => tracker.Acknowledge(messageId)); try { await tracker.Start(consumer, cts.Token); } catch (TaskCanceledException) { } @@ -129,8 +122,6 @@ public async void Start_GivenAMessageIdIsNotAckedWithinTimeout_ShouldRedeliverOn var consumer = Substitute.For(); var messageId = MessageId.Latest; var cts = new CancellationTokenSource(); - - var tracker = new UnackedMessageTracker( TimeSpan.FromMilliseconds(10), TimeSpan.FromMilliseconds(5)); From ad5827114c8d019aac86d300818b718ae8ab9d19 Mon Sep 17 00:00:00 2001 From: dionjansen Date: Fri, 15 Jan 2021 17:48:30 +0100 Subject: [PATCH 27/28] Refactor use for MessageId to MessageIdData, added RedeliverUnacknowledgedMessages using MessageIdData --- src/DotPulsar/Abstractions/IConsumer.cs | 6 ++++++ .../Internal/Abstractions/IMessageQueue.cs | 5 +++-- .../IUnacknowledgedMessageTracker.cs | 9 +++++---- src/DotPulsar/Internal/Consumer.cs | 7 +++++-- src/DotPulsar/Internal/ConsumerChannel.cs | 2 +- .../Internal/InactiveUnackedMessageTracker.cs | 5 +++-- src/DotPulsar/Internal/MessageQueue.cs | 7 ++++--- .../Internal/UnacknowledgedMessageTracker.cs | 19 ++++++++++--------- .../Internal/AwaitingAckTests.cs | 3 ++- .../UnacknowledgedMessageTrackerTests.cs | 17 +++++++++-------- 10 files changed, 48 insertions(+), 32 deletions(-) diff --git a/src/DotPulsar/Abstractions/IConsumer.cs b/src/DotPulsar/Abstractions/IConsumer.cs index 48fe67170..ba367a98c 100644 --- a/src/DotPulsar/Abstractions/IConsumer.cs +++ b/src/DotPulsar/Abstractions/IConsumer.cs @@ -14,6 +14,7 @@ namespace DotPulsar.Abstractions { + using DotPulsar.Internal.PulsarApi; using System; using System.Collections.Generic; using System.Threading; @@ -127,6 +128,11 @@ public interface IConsumer : IAsyncDisposable /// ValueTask RedeliverUnacknowledgedMessages(IEnumerable messageIds, CancellationToken cancellationToken); + /// + /// Redeliver the pending messages that were pushed to this consumer that are not yet acknowledged. + /// + ValueTask RedeliverUnacknowledgedMessages(IEnumerable messageIds, CancellationToken cancellationToken); + /// /// Redeliver all pending messages that were pushed to this consumer that are not yet acknowledged. /// diff --git a/src/DotPulsar/Internal/Abstractions/IMessageQueue.cs b/src/DotPulsar/Internal/Abstractions/IMessageQueue.cs index dc3af15a8..a6a52f602 100644 --- a/src/DotPulsar/Internal/Abstractions/IMessageQueue.cs +++ b/src/DotPulsar/Internal/Abstractions/IMessageQueue.cs @@ -14,12 +14,13 @@ namespace DotPulsar.Internal.Abstractions { + using DotPulsar.Internal.PulsarApi; using System; public interface IMessageQueue : IDequeue, IDisposable { - void Acknowledge(MessageId messageId); + void Acknowledge(MessageIdData messageId); - void NegativeAcknowledge(MessageId messageId); + void NegativeAcknowledge(MessageIdData messageId); } } diff --git a/src/DotPulsar/Internal/Abstractions/IUnacknowledgedMessageTracker.cs b/src/DotPulsar/Internal/Abstractions/IUnacknowledgedMessageTracker.cs index f9b0070f3..0a294e07e 100644 --- a/src/DotPulsar/Internal/Abstractions/IUnacknowledgedMessageTracker.cs +++ b/src/DotPulsar/Internal/Abstractions/IUnacknowledgedMessageTracker.cs @@ -15,15 +15,16 @@ namespace DotPulsar.Internal.Abstractions { using DotPulsar.Abstractions; - using System.Threading.Tasks; - using System.Threading; + using DotPulsar.Internal.PulsarApi; using System; + using System.Threading; + using System.Threading.Tasks; public interface IUnacknowledgedMessageTracker : IDisposable { - void Add(MessageId messageId); + void Add(MessageIdData messageId); - void Acknowledge(MessageId messageId); + void Acknowledge(MessageIdData messageId); Task Start(IConsumer consumer, CancellationToken cancellationToken = default); } diff --git a/src/DotPulsar/Internal/Consumer.cs b/src/DotPulsar/Internal/Consumer.cs index c6e9dfcc9..515f723b2 100644 --- a/src/DotPulsar/Internal/Consumer.cs +++ b/src/DotPulsar/Internal/Consumer.cs @@ -110,15 +110,18 @@ public async ValueTask AcknowledgeCumulative(Message message, CancellationToken public async ValueTask AcknowledgeCumulative(MessageId messageId, CancellationToken cancellationToken) => await Acknowledge(messageId.Data, CommandAck.AckType.Cumulative, cancellationToken).ConfigureAwait(false); - public async ValueTask RedeliverUnacknowledgedMessages(IEnumerable messageIds, CancellationToken cancellationToken) + public async ValueTask RedeliverUnacknowledgedMessages(IEnumerable messageIds, CancellationToken cancellationToken) { ThrowIfDisposed(); var command = new CommandRedeliverUnacknowledgedMessages(); - command.MessageIds.AddRange(messageIds.Select(m => m.Data)); + command.MessageIds.AddRange(messageIds); await _executor.Execute(() => RedeliverUnacknowledgedMessages(command, cancellationToken), cancellationToken).ConfigureAwait(false); } + public async ValueTask RedeliverUnacknowledgedMessages(IEnumerable messageIds, CancellationToken cancellationToken) + => await RedeliverUnacknowledgedMessages(messageIds.Select(m => m.Data), cancellationToken).ConfigureAwait(false); + public async ValueTask RedeliverUnacknowledgedMessages(CancellationToken cancellationToken) => await RedeliverUnacknowledgedMessages(Enumerable.Empty(), cancellationToken).ConfigureAwait(false); diff --git a/src/DotPulsar/Internal/ConsumerChannel.cs b/src/DotPulsar/Internal/ConsumerChannel.cs index 08a76e8cd..967126869 100644 --- a/src/DotPulsar/Internal/ConsumerChannel.cs +++ b/src/DotPulsar/Internal/ConsumerChannel.cs @@ -109,7 +109,7 @@ public async Task Send(CommandAck command, CancellationToken cancellationToken) command.ConsumerId = _id; - _queue.Acknowledge(new MessageId(messageId)); + _queue.Acknowledge(messageId); await _connection.Send(command, cancellationToken).ConfigureAwait(false); } diff --git a/src/DotPulsar/Internal/InactiveUnackedMessageTracker.cs b/src/DotPulsar/Internal/InactiveUnackedMessageTracker.cs index 0f47b22f8..65ff97f09 100644 --- a/src/DotPulsar/Internal/InactiveUnackedMessageTracker.cs +++ b/src/DotPulsar/Internal/InactiveUnackedMessageTracker.cs @@ -15,6 +15,7 @@ namespace DotPulsar.Internal { using Abstractions; + using PulsarApi; using DotPulsar.Abstractions; using System.Threading; using System.Threading.Tasks; @@ -22,12 +23,12 @@ namespace DotPulsar.Internal public sealed class InactiveUnackedMessageTracker : IUnacknowledgedMessageTracker { - public void Acknowledge(MessageId messageId) + public void Acknowledge(MessageIdData messageId) { return; } - public void Add(MessageId messageId) + public void Add(MessageIdData messageId) { return; } diff --git a/src/DotPulsar/Internal/MessageQueue.cs b/src/DotPulsar/Internal/MessageQueue.cs index e482969d0..4f4ec5715 100644 --- a/src/DotPulsar/Internal/MessageQueue.cs +++ b/src/DotPulsar/Internal/MessageQueue.cs @@ -15,6 +15,7 @@ namespace DotPulsar.Internal { using Abstractions; + using PulsarApi; using System; using System.Threading; using System.Threading.Tasks; @@ -33,13 +34,13 @@ public MessageQueue(AsyncQueue queue, IUnacknowledgedMessageTrac public async ValueTask Dequeue(CancellationToken cancellationToken = default) { var message = await _queue.Dequeue(cancellationToken).ConfigureAwait(false); - _tracker.Add(new MessageId(message.MessageId)); + _tracker.Add(message.MessageId); return message; } - public void Acknowledge(MessageId obj) => _tracker.Acknowledge(obj); + public void Acknowledge(MessageIdData obj) => _tracker.Acknowledge(obj); - public void NegativeAcknowledge(MessageId obj) + public void NegativeAcknowledge(MessageIdData obj) { throw new NotImplementedException(); } diff --git a/src/DotPulsar/Internal/UnacknowledgedMessageTracker.cs b/src/DotPulsar/Internal/UnacknowledgedMessageTracker.cs index 33ec07aca..3ec7dbc4c 100644 --- a/src/DotPulsar/Internal/UnacknowledgedMessageTracker.cs +++ b/src/DotPulsar/Internal/UnacknowledgedMessageTracker.cs @@ -16,20 +16,21 @@ namespace DotPulsar.Internal { using Abstractions; using DotPulsar.Abstractions; + using PulsarApi; using System; using System.Collections.Concurrent; + using System.Collections.Generic; using System.Diagnostics; using System.Linq; - using System.Collections.Generic; using System.Threading; using System.Threading.Tasks; public readonly struct AwaitingAck { - public MessageId MessageId { get; } + public MessageIdData MessageId { get; } public long Timestamp { get; } - public AwaitingAck(MessageId messageId) + public AwaitingAck(MessageIdData messageId) { MessageId = messageId; Timestamp = Stopwatch.GetTimestamp(); @@ -44,7 +45,7 @@ public sealed class UnackedMessageTracker : IUnacknowledgedMessageTracker private readonly TimeSpan _ackTimeout; private readonly TimeSpan _pollingTimeout; private readonly ConcurrentQueue _awaitingAcks; - private readonly List _acked; + private readonly List _acked; private readonly CancellationTokenSource _cancellationTokenSource; public UnackedMessageTracker(TimeSpan ackTimeout, TimeSpan pollingTimeout) @@ -52,16 +53,16 @@ public UnackedMessageTracker(TimeSpan ackTimeout, TimeSpan pollingTimeout) _ackTimeout = ackTimeout; _pollingTimeout = pollingTimeout; _awaitingAcks = new ConcurrentQueue(); - _acked = new List(); + _acked = new List(); _cancellationTokenSource = new CancellationTokenSource(); } - public void Add(MessageId messageId) + public void Add(MessageIdData messageId) { _awaitingAcks.Enqueue(new AwaitingAck(messageId)); } - public void Acknowledge(MessageId messageId) + public void Acknowledge(MessageIdData messageId) { // We only need to store the highest cumulative ack we see (if there is one) // and the MessageIds not included by that cumulative ack. @@ -88,9 +89,9 @@ public Task Start(IConsumer consumer, CancellationToken cancellationToken = defa }, token); } - private IEnumerable CheckUnackedMessages() + private IEnumerable CheckUnackedMessages() { - var result = new List(); + var result = new List(); while (_awaitingAcks.TryPeek(out AwaitingAck awaiting) && awaiting.Elapsed > _ackTimeout) diff --git a/tests/DotPulsar.Tests/Internal/AwaitingAckTests.cs b/tests/DotPulsar.Tests/Internal/AwaitingAckTests.cs index 70aa71575..793de3146 100644 --- a/tests/DotPulsar.Tests/Internal/AwaitingAckTests.cs +++ b/tests/DotPulsar.Tests/Internal/AwaitingAckTests.cs @@ -15,6 +15,7 @@ namespace DotPulsar.Tests.Internal { using DotPulsar.Internal; + using DotPulsar.Internal.PulsarApi; using FluentAssertions; using System; using System.Diagnostics; @@ -27,7 +28,7 @@ public class AwaitingAckTests public async void Elapsed_GivenTimeElapsed_ShoulEqualCorrectElapsedTicks() { //Arrange - var messageId = MessageId.Latest; + var messageId = new MessageIdData(); var sw = Stopwatch.StartNew(); //Act diff --git a/tests/DotPulsar.Tests/Internal/UnacknowledgedMessageTrackerTests.cs b/tests/DotPulsar.Tests/Internal/UnacknowledgedMessageTrackerTests.cs index 8e6eefeb3..396472c92 100644 --- a/tests/DotPulsar.Tests/Internal/UnacknowledgedMessageTrackerTests.cs +++ b/tests/DotPulsar.Tests/Internal/UnacknowledgedMessageTrackerTests.cs @@ -18,6 +18,7 @@ namespace DotPulsar.Tests.Internal using AutoFixture.AutoNSubstitute; using DotPulsar.Abstractions; using DotPulsar.Internal; + using DotPulsar.Internal.PulsarApi; using NSubstitute; using System; using System.Collections.Generic; @@ -36,7 +37,7 @@ public async void Start_GivenAMessageIdIsNotAcked_ShouldRedeliver() var fixture = new Fixture(); fixture.Customize(new AutoNSubstituteCustomization()); var consumer = Substitute.For(); - var messageId = MessageId.Latest; + var messageId = new MessageIdData(); var cts = new CancellationTokenSource(); var tracker = new UnackedMessageTracker( TimeSpan.FromMilliseconds(10), @@ -52,7 +53,7 @@ public async void Start_GivenAMessageIdIsNotAcked_ShouldRedeliver() await consumer .Received(1) .RedeliverUnacknowledgedMessages( - Arg.Is(EquivalentTo(new List() { messageId })), + Arg.Is(EquivalentTo(new List() { messageId })), Arg.Any()); } @@ -63,7 +64,7 @@ public async void Start_GivenAMessageIdIsAckedWithinTimeout_ShouldNotRedeliver() var fixture = new Fixture(); fixture.Customize(new AutoNSubstituteCustomization()); var consumer = Substitute.For(); - var messageId = MessageId.Latest; + var messageId = new MessageIdData(); var cts = new CancellationTokenSource(); var tracker = new UnackedMessageTracker( TimeSpan.FromMilliseconds(10), @@ -80,7 +81,7 @@ public async void Start_GivenAMessageIdIsAckedWithinTimeout_ShouldNotRedeliver() await consumer .DidNotReceive() .RedeliverUnacknowledgedMessages( - Arg.Any>(), + Arg.Any>(), Arg.Any()); } @@ -91,7 +92,7 @@ public async void Start_GivenAMessageIdIsNotAckedWithinTimeout_ShouldRedeliver() var fixture = new Fixture(); fixture.Customize(new AutoNSubstituteCustomization()); var consumer = Substitute.For(); - var messageId = MessageId.Latest; + var messageId = new MessageIdData(); var cts = new CancellationTokenSource(); var tracker = new UnackedMessageTracker( TimeSpan.FromMilliseconds(10), @@ -109,7 +110,7 @@ public async void Start_GivenAMessageIdIsNotAckedWithinTimeout_ShouldRedeliver() await consumer .Received(1) .RedeliverUnacknowledgedMessages( - Arg.Any>(), + Arg.Any>(), Arg.Any()); } @@ -120,7 +121,7 @@ public async void Start_GivenAMessageIdIsNotAckedWithinTimeout_ShouldRedeliverOn var fixture = new Fixture(); fixture.Customize(new AutoNSubstituteCustomization()); var consumer = Substitute.For(); - var messageId = MessageId.Latest; + var messageId = new MessageIdData(); var cts = new CancellationTokenSource(); var tracker = new UnackedMessageTracker( TimeSpan.FromMilliseconds(10), @@ -136,7 +137,7 @@ public async void Start_GivenAMessageIdIsNotAckedWithinTimeout_ShouldRedeliverOn await consumer .Received(1) .RedeliverUnacknowledgedMessages( - Arg.Any>(), + Arg.Any>(), Arg.Any()); } From f02166b4b045bf057d4204b9a1e9e558faa79913 Mon Sep 17 00:00:00 2001 From: dionjansen Date: Fri, 15 Jan 2021 18:01:21 +0100 Subject: [PATCH 28/28] Refactored Start method to not use unnecessary Task.run --- .../Internal/UnacknowledgedMessageTracker.cs | 19 +++++++++---------- 1 file changed, 9 insertions(+), 10 deletions(-) diff --git a/src/DotPulsar/Internal/UnacknowledgedMessageTracker.cs b/src/DotPulsar/Internal/UnacknowledgedMessageTracker.cs index 3ec7dbc4c..df531a609 100644 --- a/src/DotPulsar/Internal/UnacknowledgedMessageTracker.cs +++ b/src/DotPulsar/Internal/UnacknowledgedMessageTracker.cs @@ -69,24 +69,23 @@ public void Acknowledge(MessageIdData messageId) _acked.Add(messageId); } - public Task Start(IConsumer consumer, CancellationToken cancellationToken = default) + public async Task Start(IConsumer consumer, CancellationToken cancellationToken = default) { + await Task.Yield(); + CancellationToken token = CancellationTokenSource.CreateLinkedTokenSource( _cancellationTokenSource.Token, cancellationToken).Token; - return Task.Run(async () => + while (!token.IsCancellationRequested) { - while (!token.IsCancellationRequested) - { - var messages = CheckUnackedMessages(); + var messages = CheckUnackedMessages(); - if (messages.Count() > 0) - await consumer.RedeliverUnacknowledgedMessages(messages, token); + if (messages.Count() > 0) + await consumer.RedeliverUnacknowledgedMessages(messages, token); - await Task.Delay(_pollingTimeout, token); - } - }, token); + await Task.Delay(_pollingTimeout, token); + } } private IEnumerable CheckUnackedMessages()