-
Notifications
You must be signed in to change notification settings - Fork 39
Expand file tree
/
Copy pathSubscriber.cs
More file actions
175 lines (145 loc) · 6.37 KB
/
Subscriber.cs
File metadata and controls
175 lines (145 loc) · 6.37 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
namespace Cloudtoid.Interprocess;
internal sealed class Subscriber : Queue, ISubscriber
{
private static readonly long TicksForTenSeconds = TimeSpan.FromSeconds(10).Ticks;
private readonly CancellationTokenSource cancellationSource = new();
private readonly CountdownEvent countdownEvent = new(1);
private readonly IInterprocessSemaphoreWaiter signal;
internal Subscriber(QueueOptions options, ILoggerFactory loggerFactory)
: base(options, loggerFactory) => signal = InterprocessSemaphore.CreateWaiter(options.QueueName);
public bool TryDequeue(CancellationToken cancellation, out ReadOnlyMemory<byte> message) =>
TryDequeueCore(default, cancellation, out message);
public bool TryDequeue(Memory<byte> buffer, CancellationToken cancellation, out ReadOnlyMemory<byte> message) =>
TryDequeueCore(buffer, cancellation, out message);
public ReadOnlyMemory<byte> Dequeue(CancellationToken cancellation) =>
DequeueCore(default, cancellation);
public ReadOnlyMemory<byte> Dequeue(Memory<byte> buffer, CancellationToken cancellation) =>
DequeueCore(buffer, cancellation);
protected override void Dispose(bool disposing)
{
// drain the Dequeue/TryDequeue requests
cancellationSource.Cancel();
countdownEvent.Signal();
countdownEvent.Wait();
// There is a potential for a race condition in DequeueCore if the cancellationSource
// was not cancelled before AddEvent is called. The sleep here will prevent that.
Thread.Sleep(millisecondsTimeout: 10);
if (disposing)
{
countdownEvent.Dispose();
signal.Dispose();
cancellationSource.Dispose();
}
base.Dispose(disposing);
}
private bool TryDequeueCore(
Memory<byte>? resultBuffer,
CancellationToken cancellation,
out ReadOnlyMemory<byte> message)
{
// do NOT reorder the cancellation and the AddCount operation below. See Dispose for more information.
cancellationSource.ThrowIfCancellationRequested(cancellation);
countdownEvent.AddCount();
try
{
return TryDequeueImpl(resultBuffer, cancellation, out message);
}
finally
{
countdownEvent.Signal();
}
}
private ReadOnlyMemory<byte> DequeueCore(Memory<byte>? resultBuffer, CancellationToken cancellation)
{
// do NOT reorder the cancellation and the AddCount operation below. See Dispose for more information.
cancellationSource.ThrowIfCancellationRequested(cancellation);
countdownEvent.AddCount();
try
{
int i = -5;
while (true)
{
if (TryDequeueImpl(resultBuffer, cancellation, out var message))
return message;
if (i > 10)
signal.Wait(millisecondsTimeout: 10);
else if (i++ > 0)
signal.Wait(millisecondsTimeout: i);
else
Thread.Yield();
}
}
finally
{
countdownEvent.Signal();
}
}
private unsafe bool TryDequeueImpl(
Memory<byte>? resultBuffer,
CancellationToken cancellation,
out ReadOnlyMemory<byte> message)
{
cancellationSource.ThrowIfCancellationRequested(cancellation);
message = ReadOnlyMemory<byte>.Empty;
var header = *Header;
// is this an empty queue?
if (header.IsEmpty())
return false;
var readLockTimestamp = header.ReadLockTimestamp;
var start = DateTime.UtcNow.Ticks;
// is there already a read-lock or has the previous lock timed out meaning that a subscriber crashed?
if (start - readLockTimestamp < TicksForTenSeconds)
return false;
// take a read-lock so no other thread can read a message
if (Interlocked.CompareExchange(ref Header->ReadLockTimestamp, start, readLockTimestamp) != readLockTimestamp)
return false;
try
{
// is the queue empty now that we were able to get a read-lock?
if (Header->IsEmpty())
return false;
// now finally have a read-lock and the queue is not empty
var readOffset = Header->ReadOffset;
var writeOffset = Header->WriteOffset;
var messageHeader = (MessageHeader*)Buffer.GetPointer(readOffset);
while (true)
{
// was this message fully written by the publisher? if not, wait for the publisher to finish writing it
var state = Interlocked.CompareExchange(
ref messageHeader->State,
MessageHeader.LockedToBeConsumedState,
MessageHeader.ReadyToBeConsumedState);
if (state == MessageHeader.ReadyToBeConsumedState)
break;
// but if the publisher crashed, we will never get the message, so we need to handle that case by timing out
if (DateTime.UtcNow.Ticks - start > TicksForTenSeconds)
{
// the publisher crashed and we will never get the message
// so we need to release the read-lock and advance the queue for everyone.
// some messages might be lost in this case but this is the best we can do.
Interlocked.Exchange(ref Header->ReadOffset, writeOffset);
return false;
}
Thread.Yield();
}
// read the message body from the queue
var bodyLength = messageHeader->BodyLength;
message = Buffer.Read(
GetMessageBodyOffset(readOffset),
bodyLength,
resultBuffer);
// zero out the message, including the message header
var messageLength = GetPaddedMessageLength(bodyLength);
Buffer.Clear(readOffset, messageLength);
// update the read offset of the queue
var newReadOffset = SafeIncrementMessageOffset(readOffset, messageLength);
Interlocked.Exchange(ref Header->ReadOffset, newReadOffset);
}
finally
{
// release the read-lock
Interlocked.Exchange(ref Header->ReadLockTimestamp, 0L);
}
return true;
}
}