Skip to content

Commit e76590e

Browse files
committed
Update Arq layer, (dis)connected still missing
1 parent 01f1123 commit e76590e

2 files changed

Lines changed: 85 additions & 43 deletions

File tree

src/LibStored.Net/Protocol/ArqLayer.cs

Lines changed: 72 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ public class ArqLayer : ProtocolLayer
5959
/// <summary>
6060
/// Number of successive retransmits before the event is emitted.
6161
/// </summary>
62-
public const int RetransmitCallbackThreshold = 10;
62+
public const int RetransmitCallbackThreshold = 16;
6363

6464
private const byte NopFlag = 0x40;
6565
private const byte AckFlag = 0x80;
@@ -71,6 +71,7 @@ public class ArqLayer : ProtocolLayer
7171

7272
private int _encodeQueueBytes;
7373
private bool _encoding;
74+
private bool _connected;
7475
private bool _pauseTransmit;
7576
private bool _didTransmit;
7677
private byte _sendSeq;
@@ -103,15 +104,16 @@ public override void Reset()
103104
}
104105

105106
_encoding = false;
107+
_connected = false;
106108
_pauseTransmit = false;
107109
_didTransmit = false;
108110
_retransmits = 0;
109111
_sendSeq = 0;
110112
_recvSeq = 0;
111113
_buffer.Clear();
112114

115+
// base.Disconnected();
113116
base.Reset();
114-
115117
KeepAlive();
116118
}
117119

@@ -125,6 +127,8 @@ public override bool Flush()
125127
/// <inheritdoc />
126128
public override void Encode(ReadOnlySpan<byte> buffer, bool last)
127129
{
130+
bool isIdle = !WaitingForAck();
131+
128132
if (_maxEncodeBufferSize > 0 && _maxEncodeBufferSize < _encodeQueueBytes + buffer.Length + 1 )
129133
{
130134
Event(ArqEvent.EncodeBufferOverflow);
@@ -154,80 +158,94 @@ public override void Encode(ReadOnlySpan<byte> buffer, bool last)
154158
}
155159
}
156160

157-
Transmit();
161+
if (isIdle)
162+
{
163+
Transmit();
164+
}
158165
}
159166

160167
/// <inheritdoc/>
161168
public override void Decode(Span<byte> buffer)
162169
{
163-
bool reconnect = false;
170+
bool resetHandshake = false;
164171

165172
Span<byte> response = [0, 0];
166173
byte responseLen = 0;
167174
bool doTransmit = false;
168175
bool doDecode = false;
169176

177+
Debug.Assert(!_pauseTransmit);
178+
_pauseTransmit = true;
179+
170180
while (buffer.Length > 0)
171181
{
172182
byte header = buffer[0];
183+
byte headerSeq = (byte)(header & SeqMask);
184+
173185
if ((header & AckFlag) != 0)
174186
{
175-
if (WaitingForAck() && (header & SeqMask) == (_encodeQueue.First()[0] & SeqMask))
187+
if (headerSeq == 0)
188+
{
189+
// Maybe be an ack to our reset message.
190+
resetHandshake = true;
191+
}
192+
193+
if (WaitingForAck() && headerSeq == (_encodeQueue.First()[0] & SeqMask))
176194
{
177195
PopEncodeQueue();
178196
_retransmits = 0;
179197

180198
doTransmit = true;
181199

182-
if ((header & SeqMask) == 0)
200+
if (resetHandshake)
183201
{
184-
reconnect = true;
202+
// This is an ack to our reset message.
203+
_connected = true;
204+
_recvSeq = NextSeq(0);
185205
// Libstored uses a method, here an event is used.
186206
Event(ArqEvent.Connected);
187207
}
188208
}
189209

190210
buffer = buffer.Slice(1);
191211
}
192-
else if((header & SeqMask) == _recvSeq)
212+
else if (headerSeq == 0)
193213
{
194-
// The next message
195-
response[responseLen++] = (byte) (_recvSeq | AckFlag);
196-
_recvSeq = NextSeq(_recvSeq);
197-
198-
doDecode = (header & NopFlag) == 0;
199-
doTransmit = true;
200-
201-
buffer = buffer.Slice(1);
202-
}
203-
else if ((header & SeqMask) == 0)
204-
{
205-
// Unexpected reset
206-
Event(ArqEvent.Reconnect);
214+
// Reset handshake
207215

208216
// Send ack
209-
_recvSeq = NextSeq(0);
210217
response[responseLen++] = AckFlag;
218+
// Drop the rest
219+
buffer = Span<byte>.Empty;
211220

212221
// Also reset our send seq.
213-
if (!reconnect && (_encodeQueueBytes == 0 || _encodeQueue.First()[0] != NopFlag))
222+
if (!resetHandshake)
214223
{
215-
// Insert at front of queue.
216-
PushEncodeQueueRaw([NopFlag], false);
217-
_sendSeq = NextSeq(0);
224+
PushReset();
225+
226+
doTransmit = true;
218227

219-
// Re-encode existing outbound messages.
220-
foreach (byte[] bytes in _encodeQueue.Skip(1))
228+
if (_connected)
221229
{
222-
bytes[0] = (byte)((bytes[0] & ~SeqMask) | _sendSeq);
223-
_sendSeq = NextSeq(_sendSeq);
230+
_connected = false;
231+
Event(ArqEvent.Reconnect);
224232
}
233+
234+
// base.Disconnected()
225235
}
236+
}
237+
else if(headerSeq == _recvSeq)
238+
{
239+
// The next message
240+
response[responseLen++] = (byte) (_recvSeq | AckFlag);
241+
_recvSeq = NextSeq(_recvSeq);
226242

243+
doDecode = (header & NopFlag) == 0;
227244
doTransmit = true;
245+
228246
buffer = buffer.Slice(1);
229247
}
230-
else if(NextSeq((byte)(header & SeqMask)) == _recvSeq)
248+
else if(NextSeq(headerSeq) == _recvSeq)
231249
{
232250
// Retransmit, send an Ack again
233251
response[responseLen++] = (byte) (header & SeqMask | AckFlag);
@@ -263,21 +281,19 @@ public override void Decode(Span<byte> buffer)
263281

264282
if (doDecode)
265283
{
266-
Debug.Assert(!_pauseTransmit);
267-
_pauseTransmit = true;
268-
269284
_didTransmit = false;
270285
// Decode and queue only
271286
base.Decode(buffer);
272287
if (_didTransmit)
273288
{
274289
doTransmit = true;
275290
}
276-
277-
Debug.Assert(_pauseTransmit);
278-
_pauseTransmit = false;
279291
}
280292

293+
// Dont expect recursion
294+
Debug.Assert(_pauseTransmit);
295+
_pauseTransmit = false;
296+
281297
if (responseLen > 0)
282298
{
283299
base.Encode(response.Slice(0,responseLen), !doTransmit);
@@ -308,6 +324,13 @@ public void KeepAlive()
308324
Transmit();
309325
}
310326

327+
/// <summary>
328+
/// Call this function at a regular interval to retransmit messages, when necessary.
329+
/// When no messages are queued, this function does nothing.
330+
/// </summary>
331+
/// <returns>True when a message was sent.</returns>
332+
public bool Process() => Transmit();
333+
311334
/// <summary>
312335
/// Transmit the first message in the encode queue.
313336
/// </summary>
@@ -351,6 +374,18 @@ private void PushEncodeQueue(ReadOnlySpan<byte> buffer)
351374
PushEncodeQueueRaw(bytes);
352375
}
353376

377+
private void PushReset()
378+
{
379+
while (_encodeQueue.Count > 0)
380+
{
381+
PopEncodeQueue();
382+
}
383+
384+
_sendSeq = 0;
385+
PushEncodeQueueRaw([NopFlag], false);
386+
_recvSeq = 0;
387+
}
388+
354389
private void PushEncodeQueueRaw(byte[] bytes, bool back = true)
355390
{
356391
if (back)

test/LibStored.Net.Tests/ArqLayerTests.cs

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,7 @@ public void RetransmitTest()
7676

7777
top.Encode(" 2"u8, true);
7878
// Triggers retransmit of 1
79+
arq.KeepAlive();
7980
Assert.Equal(ProtocolTests.String([0x01, .." 1"u8]), bottom.Encoded[1]);
8081

8182
top.Flush();
@@ -191,32 +192,35 @@ public void ReconnectTest()
191192
Assert.Equal(ProtocolTests.String([0x80, 0x40]), bottom.Encoded[2]);
192193

193194
top.Encode(" 3"u8, true);
195+
arq.KeepAlive();
194196
Assert.Equal(ProtocolTests.String([0x40]), bottom.Encoded[3]); // Retransmit
195197

196198
bottom.Decode([0x40]);
197199
Assert.Equal(ProtocolTests.String([0x80, 0x40]), bottom.Encoded[4]); // Reconnect
198200

199201
// Separate ack/reset does not fully reconnect; expect reset.
200202
bottom.Decode([0xC0]);
201-
Assert.Equal(ProtocolTests.String([0x01, .." 3"u8]), bottom.Encoded[5]);
203+
arq.KeepAlive(); // nop, no retransmit after reset
204+
Assert.Equal(ProtocolTests.String([0x41]), bottom.Encoded[5]);
202205
bottom.Decode([0x40]);
203206
Assert.Equal(ProtocolTests.String([0x80, 0x40]), bottom.Encoded[6]); // Full reset again
204207

205208
// In same message, reconnection completes.
206209
bottom.Decode([0xC0, 0x40]);
207-
Assert.Equal(ProtocolTests.String([0x80, 0x01, .." 3"u8]), bottom.Encoded[7]);
210+
Assert.Equal(ProtocolTests.String([0x80]), bottom.Encoded[7]);
208211
bottom.Decode([0x81]);
209212

210213
top.Encode(" 4"u8, true);
211-
Assert.Equal(ProtocolTests.String([0x02, .." 4"u8]), bottom.Encoded[8]);
212-
bottom.Decode([0x82]);
214+
Assert.Equal(ProtocolTests.String([0x01, .." 4"u8]), bottom.Encoded[8]);
215+
bottom.Decode([0x81]);
213216

214217
top.Encode(" 5"u8, true);
215-
Assert.Equal(ProtocolTests.String([0x03, .." 5"u8]), bottom.Encoded[9]);
218+
Assert.Equal(ProtocolTests.String([0x02, .." 5"u8]), bottom.Encoded[9]);
216219
bottom.Decode([0x40]);
217220
Assert.Equal(ProtocolTests.String([0x80, 0x40]), bottom.Encoded[10]); // Full reset again
218221
bottom.Decode([0x80]);
219-
Assert.Equal(ProtocolTests.String([0x01, .." 5"u8]), bottom.Encoded[11]);
222+
arq.KeepAlive();
223+
Assert.Equal(ProtocolTests.String([0x41]), bottom.Encoded[11]);
220224
}
221225

222226
[Fact]
@@ -232,6 +236,9 @@ public void Reconnect2Test()
232236

233237
LoopbackLayer l = new(a, b);
234238

239+
// Complete handshake
240+
a.KeepAlive();
241+
235242
la.Encode(" 1"u8, true);
236243
Assert.Equal(" 1", lb.Decoded[0]);
237244

0 commit comments

Comments
 (0)