Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
4eb2b06
Added missing reference to Microsoft.Bcl.AsyncInterfaces in samples a…
dionjansen Dec 19, 2020
dfb864a
nack delay option
dionjansen Dec 20, 2020
370e1c8
ack timeout config option
dionjansen Dec 20, 2020
278005f
naive implementation tracker and message queue
dionjansen Dec 20, 2020
79bc906
use message package for queue
dionjansen Dec 20, 2020
546d9e9
use int for ms values
dionjansen Dec 21, 2020
5e3cea6
Non generic interface
dionjansen Dec 21, 2020
19931bc
Added basic implementation for tracker, integrated messagequeue in co…
dionjansen Dec 21, 2020
295eab8
Create conditional tracker in client
dionjansen Dec 21, 2020
2be5114
Inform queue when acking
dionjansen Dec 21, 2020
163ea45
Cleanup references for VSCode
dionjansen Dec 22, 2020
6fc4550
Merge branch 'master' into nack
dionjansen Dec 23, 2020
7da197e
Unacked tracker only
dionjansen Dec 23, 2020
d759c06
Cleanup
dionjansen Dec 23, 2020
26cd957
Added test suite for unacked message tracker
dionjansen Jan 3, 2021
f4725f5
Refactored elapsed calc
dionjansen Jan 3, 2021
51f4a98
cleanup
dionjansen Jan 3, 2021
0119ca9
Cleanup old tracker
dionjansen Jan 6, 2021
7ad89a2
Apache header for new files
dionjansen Jan 6, 2021
a7d5f02
Cleanup inactive message tracker
dionjansen Jan 6, 2021
98850bb
Cleanup stress tests deps
dionjansen Jan 6, 2021
2edfa35
Cleanup message queue and interface
dionjansen Jan 6, 2021
b78ac54
Formatting tracker
dionjansen Jan 6, 2021
b57668b
Fixed tests naming, separated tests for struct
dionjansen Jan 6, 2021
871dce6
Renamed options, use timespans instad of micros
dionjansen Jan 6, 2021
8841930
Moved IEquatable and IComparable into MessageIdData partial class
dionjansen Jan 15, 2021
ddfd374
Renamed ack to full acknowledge, cleaned up tests
dionjansen Jan 15, 2021
ad58271
Refactor use for MessageId to MessageIdData, added RedeliverUnacknowl…
dionjansen Jan 15, 2021
f02166b
Refactored Start method to not use unnecessary Task.run
dionjansen Jan 15, 2021
5771533
Merge branch 'master' into nack
dionjansen Feb 14, 2021
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions src/DotPulsar/Abstractions/IConsumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

namespace DotPulsar.Abstractions
{
using DotPulsar.Internal.PulsarApi;
using System;
using System.Collections.Generic;
using System.Threading;
Expand Down Expand Up @@ -59,6 +60,11 @@ public interface IConsumer : IGetLastMessageId, IReceive, ISeek, IState<Consumer
/// </summary>
ValueTask RedeliverUnacknowledgedMessages(IEnumerable<MessageId> messageIds, CancellationToken cancellationToken = default);

/// <summary>
/// Redeliver the pending messages that were pushed to this consumer that are not yet acknowledged.
/// </summary>
ValueTask RedeliverUnacknowledgedMessages(IEnumerable<MessageIdData> messageIds, CancellationToken cancellationToken);

/// <summary>
/// Redeliver all pending messages that were pushed to this consumer that are not yet acknowledged.
/// </summary>
Expand Down
12 changes: 12 additions & 0 deletions src/DotPulsar/ConsumerOptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
namespace DotPulsar
{
using DotPulsar.Abstractions;
using System;

/// <summary>
/// The consumer building options.
Expand Down Expand Up @@ -104,5 +105,16 @@ public ConsumerOptions(string subscriptionName, string topic)
/// Set the topic for this consumer. This is required.
/// </summary>
public string Topic { get; set; }

/// <summary>
/// Delay to wait before redelivering messages that failed to be processed.
/// When an application uses IConsumer.NegativeAcknowledge(Message), failed messages are redelivered after a fixed timeout.
/// </summary>
public TimeSpan NegativeAcknowledgementRedeliveryDelay { get; set; }

/// <summary>
/// Timeout of unacked messages
/// </summary>
public TimeSpan AcknowledgementTimeout { get; set; }
}
}
26 changes: 26 additions & 0 deletions src/DotPulsar/Internal/Abstractions/IMessageQueue.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

namespace DotPulsar.Internal.Abstractions
{
using DotPulsar.Internal.PulsarApi;
using System;

public interface IMessageQueue : IDequeue<MessagePackage>, IDisposable
{
void Acknowledge(MessageIdData messageId);

void NegativeAcknowledge(MessageIdData messageId);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

namespace DotPulsar.Internal.Abstractions
{
using DotPulsar.Abstractions;
using DotPulsar.Internal.PulsarApi;
using System;
using System.Threading;
using System.Threading.Tasks;

public interface IUnacknowledgedMessageTracker : IDisposable
{
void Add(MessageIdData messageId);

void Acknowledge(MessageIdData messageId);

Task Start(IConsumer consumer, CancellationToken cancellationToken = default);
}
}
5 changes: 4 additions & 1 deletion src/DotPulsar/Internal/Consumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ public async ValueTask Acknowledge(MessageId messageId, CancellationToken cancel
public async ValueTask AcknowledgeCumulative(MessageId messageId, CancellationToken cancellationToken)
=> await Acknowledge(messageId, CommandAck.AckType.Cumulative, cancellationToken).ConfigureAwait(false);

public async ValueTask RedeliverUnacknowledgedMessages(IEnumerable<MessageId> messageIds, CancellationToken cancellationToken)
public async ValueTask RedeliverUnacknowledgedMessages(IEnumerable<MessageIdData> messageIds, CancellationToken cancellationToken)
{
ThrowIfDisposed();

Expand All @@ -112,6 +112,9 @@ public async ValueTask RedeliverUnacknowledgedMessages(IEnumerable<MessageId> me
await _executor.Execute(() => RedeliverUnacknowledgedMessages(command, cancellationToken), cancellationToken).ConfigureAwait(false);
}

public async ValueTask RedeliverUnacknowledgedMessages(IEnumerable<MessageId> messageIds, CancellationToken cancellationToken)
=> await RedeliverUnacknowledgedMessages(messageIds.Select(m => m.Data), cancellationToken).ConfigureAwait(false);

public async ValueTask RedeliverUnacknowledgedMessages(CancellationToken cancellationToken)
=> await RedeliverUnacknowledgedMessages(Enumerable.Empty<MessageId>(), cancellationToken).ConfigureAwait(false);

Expand Down
7 changes: 5 additions & 2 deletions src/DotPulsar/Internal/ConsumerChannel.cs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ namespace DotPulsar.Internal
public sealed class ConsumerChannel : IConsumerChannel
{
private readonly ulong _id;
private readonly AsyncQueue<MessagePackage> _queue;
private readonly MessageQueue _queue;
private readonly IConnection _connection;
private readonly BatchHandler _batchHandler;
private readonly CommandFlow _cachedCommandFlow;
Expand All @@ -35,7 +35,7 @@ public sealed class ConsumerChannel : IConsumerChannel
public ConsumerChannel(
ulong id,
uint messagePrefetchCount,
AsyncQueue<MessagePackage> queue,
MessageQueue queue,
IConnection connection,
BatchHandler batchHandler)
{
Expand Down Expand Up @@ -108,6 +108,9 @@ public async Task Send(CommandAck command, CancellationToken cancellationToken)
}

command.ConsumerId = _id;

_queue.Acknowledge(messageId);

await _connection.Send(command, cancellationToken).ConfigureAwait(false);
}

Expand Down
10 changes: 8 additions & 2 deletions src/DotPulsar/Internal/ConsumerChannelFactory.cs
Original file line number Diff line number Diff line change
Expand Up @@ -28,20 +28,25 @@ public sealed class ConsumerChannelFactory : IConsumerChannelFactory
private readonly IExecute _executor;
private readonly CommandSubscribe _subscribe;
private readonly uint _messagePrefetchCount;
private readonly int _ackTimeoutMillis;
private readonly int _negativeAckRedeliveryDelayMicros;
private readonly BatchHandler _batchHandler;
private readonly IUnacknowledgedMessageTracker _tracker;

public ConsumerChannelFactory(
Guid correlationId,
IRegisterEvent eventRegister,
IConnectionPool connectionPool,
IExecute executor,
ConsumerOptions options)
ConsumerOptions options,
IUnacknowledgedMessageTracker tracker)
{
_correlationId = correlationId;
_eventRegister = eventRegister;
_connectionPool = connectionPool;
_executor = executor;
_messagePrefetchCount = options.MessagePrefetchCount;
_tracker = tracker;

_subscribe = new CommandSubscribe
{
Expand All @@ -64,9 +69,10 @@ private async ValueTask<IConsumerChannel> GetChannel(CancellationToken cancellat
{
var connection = await _connectionPool.FindConnectionForTopic(_subscribe.Topic, cancellationToken).ConfigureAwait(false);
var messageQueue = new AsyncQueue<MessagePackage>();
var consumerMessageQueue = new MessageQueue(messageQueue, _tracker);
var channel = new Channel(_correlationId, _eventRegister, messageQueue);
var response = await connection.Send(_subscribe, channel, cancellationToken).ConfigureAwait(false);
return new ConsumerChannel(response.ConsumerId, _messagePrefetchCount, messageQueue, connection, _batchHandler);
return new ConsumerChannel(response.ConsumerId, _messagePrefetchCount, consumerMessageQueue, connection, _batchHandler);
}
}
}
43 changes: 43 additions & 0 deletions src/DotPulsar/Internal/InactiveUnackedMessageTracker.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

namespace DotPulsar.Internal
{
using Abstractions;
using PulsarApi;
using DotPulsar.Abstractions;
using System.Threading;
using System.Threading.Tasks;

public sealed class InactiveUnackedMessageTracker : IUnacknowledgedMessageTracker
{

public void Acknowledge(MessageIdData messageId)
{
return;
}

public void Add(MessageIdData messageId)
{
return;
}

public Task Start(IConsumer consumer, CancellationToken cancellationToken = default) => Task.CompletedTask;

public void Dispose()
{
return;
}
}
}
54 changes: 54 additions & 0 deletions src/DotPulsar/Internal/MessageQueue.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

namespace DotPulsar.Internal
{
using Abstractions;
using PulsarApi;
using System;
using System.Threading;
using System.Threading.Tasks;

public sealed class MessageQueue : IMessageQueue
{
private readonly AsyncQueue<MessagePackage> _queue;
private readonly IUnacknowledgedMessageTracker _tracker;

public MessageQueue(AsyncQueue<MessagePackage> queue, IUnacknowledgedMessageTracker tracker)
{
_queue = queue;
_tracker = tracker;
}

public async ValueTask<MessagePackage> Dequeue(CancellationToken cancellationToken = default)
{
var message = await _queue.Dequeue(cancellationToken).ConfigureAwait(false);
_tracker.Add(message.MessageId);
return message;
}

public void Acknowledge(MessageIdData obj) => _tracker.Acknowledge(obj);

public void NegativeAcknowledge(MessageIdData obj)
{
throw new NotImplementedException();
}

public void Dispose()
{
_queue.Dispose();
_tracker.Dispose();
}
}
}
44 changes: 44 additions & 0 deletions src/DotPulsar/Internal/PulsarApi/MessageIdData.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

namespace DotPulsar.Internal.PulsarApi
{
using System;

public partial class MessageIdData : IEquatable<MessageIdData>, IComparable<MessageIdData>
{
public int CompareTo(MessageIdData? other)
{
if (other is null)
return 1;

var result = LedgerId.CompareTo(other.LedgerId);
if (result != 0)
return result;

result = EntryId.CompareTo(other.EntryId);
if (result != 0)
return result;

result = Partition.CompareTo(other.Partition);
if (result != 0)
return result;

return BatchIndex.CompareTo(other.BatchIndex);
}

public bool Equals(MessageIdData? other)
=> other is not null && LedgerId == other.LedgerId && EntryId == other.EntryId && Partition == other.Partition && BatchIndex == other.BatchIndex;
}
}
4 changes: 3 additions & 1 deletion src/DotPulsar/Internal/ReaderChannelFactory.cs
Original file line number Diff line number Diff line change
Expand Up @@ -63,9 +63,11 @@ private async ValueTask<IConsumerChannel> GetChannel(CancellationToken cancellat
{
var connection = await _connectionPool.FindConnectionForTopic(_subscribe.Topic, cancellationToken).ConfigureAwait(false);
var messageQueue = new AsyncQueue<MessagePackage>();
var tracker = new InactiveUnackedMessageTracker(); // No tracker for reader since readers don't ack.
var consumerMessageQueue = new MessageQueue(messageQueue, tracker);
var channel = new Channel(_correlationId, _eventRegister, messageQueue);
var response = await connection.Send(_subscribe, channel, cancellationToken).ConfigureAwait(false);
return new ConsumerChannel(response.ConsumerId, _messagePrefetchCount, messageQueue, connection, _batchHandler);
return new ConsumerChannel(response.ConsumerId, _messagePrefetchCount, consumerMessageQueue, connection, _batchHandler);
}
}
}
Loading