Skip to content

Commit 10cee3f

Browse files
committed
2 parents 23b65ee + c9a7d34 commit 10cee3f

1 file changed

Lines changed: 146 additions & 3 deletions

File tree

Tests/SecurityAllSubscriptionTests.cs

Lines changed: 146 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ private sealed class SecurityAllTestAdapter : MessageAdapter
2828
private readonly bool _canFilterBySecurity;
2929
private readonly Dictionary<long, MarketDataMessage> _activeSubscriptions = [];
3030

31-
public SecurityAllTestAdapter(IdGenerator transactionIdGenerator, bool canFilterBySecurity = true, bool supportCandles = false)
31+
public SecurityAllTestAdapter(IdGenerator transactionIdGenerator, bool canFilterBySecurity = true, bool supportCandles = false, bool supportDepth = false)
3232
: base(transactionIdGenerator)
3333
{
3434
_canFilterBySecurity = canFilterBySecurity;
@@ -39,6 +39,9 @@ public SecurityAllTestAdapter(IdGenerator transactionIdGenerator, bool canFilter
3939

4040
if (supportCandles)
4141
this.AddSupportedMarketDataType(TimeSpan.FromMinutes(1).TimeFrame());
42+
43+
if (supportDepth)
44+
this.AddSupportedMarketDataType(DataType.MarketDepth);
4245
}
4346

4447
public Dictionary<long, MarketDataMessage> ActiveSubscriptions => _activeSubscriptions;
@@ -112,9 +115,22 @@ await SendOutMessageAsync(new Level1ChangeMessage
112115
}
113116
.TryAdd(Level1Fields.LastTradePrice, lastPrice), cancellationToken);
114117
}
118+
119+
public async ValueTask EmitOrderBook(long subscriptionId, SecurityId secId, DateTime serverTime,
120+
QuoteChange[] bids, QuoteChange[] asks, CancellationToken cancellationToken)
121+
{
122+
await SendOutMessageAsync(new QuoteChangeMessage
123+
{
124+
SecurityId = secId,
125+
ServerTime = serverTime,
126+
Bids = bids ?? [],
127+
Asks = asks ?? [],
128+
OriginalTransactionId = subscriptionId,
129+
}, cancellationToken);
130+
}
115131
}
116132

117-
private static (TestConnector connector, SecurityAllTestAdapter adapter) CreateConnector(bool canFilterBySecurity = true, bool supportCandles = false)
133+
private static (TestConnector connector, SecurityAllTestAdapter adapter) CreateConnector(bool canFilterBySecurity = true, bool supportCandles = false, bool supportDepth = false)
118134
{
119135
var connector = new TestConnector();
120136

@@ -123,7 +139,7 @@ private static (TestConnector connector, SecurityAllTestAdapter adapter) CreateC
123139
connector.Adapter.CommissionManager = null;
124140
connector.Adapter.SendFinishedCandlesImmediatelly = true;
125141

126-
var adapter = new SecurityAllTestAdapter(connector.TransactionIdGenerator, canFilterBySecurity, supportCandles);
142+
var adapter = new SecurityAllTestAdapter(connector.TransactionIdGenerator, canFilterBySecurity, supportCandles, supportDepth);
127143
connector.Adapter.InnerAdapters.Add(adapter);
128144

129145
return (connector, adapter);
@@ -152,6 +168,8 @@ private static async Task WarmUpSecurity(SecurityAllTestAdapter adapter, long su
152168
await adapter.EmitTick(subId, secId, 1m, 1, serverTime, ct);
153169
else if (dataType == DataType.Level1)
154170
await adapter.EmitLevel1(subId, secId, 1m, serverTime, ct);
171+
else if (dataType == DataType.MarketDepth)
172+
await adapter.EmitOrderBook(subId, secId, serverTime, [new(1m, 1)], [new(2m, 1)], ct);
155173

156174
// wait for loopback child subscription to complete
157175
await Task.Delay(200, ct);
@@ -203,6 +221,56 @@ public async Task SecurityAll_SubscribeAllTicks_ReceivesMultipleSecurities()
203221

204222
#endregion
205223

224+
#region Test 1b: Subscribe all ticks, adapter can't filter, receives all securities
225+
226+
[TestMethod]
227+
[Timeout(10_000, CooperativeCancellation = true)]
228+
public async Task SecurityAll_SubscribeAllTicks_AdapterCantFilter_ReceivesAll()
229+
{
230+
var (connector, adapter) = CreateConnector(canFilterBySecurity: false);
231+
await connector.ConnectAsync(CancellationToken);
232+
233+
var sub = new Subscription(DataType.Ticks);
234+
var receivedTicks = new List<(Subscription sub, ITickTradeMessage tick)>();
235+
connector.TickTradeReceived += (s, t) => receivedTicks.Add((s, t));
236+
237+
var started = AsyncHelper.CreateTaskCompletionSource<bool>();
238+
connector.SubscriptionStarted += s => { if (ReferenceEquals(s, sub)) started.TrySetResult(true); };
239+
240+
using var runCts = new CancellationTokenSource();
241+
var run = connector.SubscribeAsync(sub, runCts.Token).AsTask();
242+
await started.Task.WithCancellation(CancellationToken);
243+
244+
var subId = await WaitForSubscription(adapter, CancellationToken);
245+
var now = DateTime.UtcNow;
246+
247+
// warm up children for all 3 securities
248+
await WarmUpSecurity(adapter, subId, AaplId, DataType.Ticks, now, CancellationToken);
249+
await WarmUpSecurity(adapter, subId, GoogId, DataType.Ticks, now.AddMilliseconds(1), CancellationToken);
250+
await WarmUpSecurity(adapter, subId, MsftId, DataType.Ticks, now.AddMilliseconds(2), CancellationToken);
251+
252+
receivedTicks.Clear();
253+
254+
// emit ticks for all 3 — all should pass through
255+
await adapter.EmitTick(subId, AaplId, 150m, 10, now.AddSeconds(1), CancellationToken);
256+
await adapter.EmitTick(subId, GoogId, 2800m, 5, now.AddSeconds(2), CancellationToken);
257+
await adapter.EmitTick(subId, MsftId, 300m, 7, now.AddSeconds(3), CancellationToken);
258+
await Task.Delay(200, CancellationToken);
259+
260+
var aaplCount = receivedTicks.Count(t => ((ISecurityIdMessage)t.tick).SecurityId == AaplId);
261+
var googCount = receivedTicks.Count(t => ((ISecurityIdMessage)t.tick).SecurityId == GoogId);
262+
var msftCount = receivedTicks.Count(t => ((ISecurityIdMessage)t.tick).SecurityId == MsftId);
263+
264+
IsTrue(aaplCount >= 1, $"Expected AAPL tick, got {aaplCount}");
265+
IsTrue(googCount >= 1, $"Expected GOOG tick, got {googCount}");
266+
IsTrue(msftCount >= 1, $"Expected MSFT tick, got {msftCount}");
267+
268+
runCts.Cancel();
269+
await run.WithCancellation(CancellationToken);
270+
}
271+
272+
#endregion
273+
206274
#region Test 2: Specific security, adapter can't filter, receives only requested
207275

208276
[TestMethod]
@@ -360,4 +428,79 @@ public async Task SecurityAll_CandlesFromTicks_MultipleSecurities()
360428
}
361429

362430
#endregion
431+
432+
#region Test 5: Late subscriber gets cached order book after snapshot already passed
433+
434+
[TestMethod]
435+
[Timeout(15_000, CooperativeCancellation = true)]
436+
public async Task SecurityAll_LateDepthSubscription_ReceivesCachedOrderBook()
437+
{
438+
var (connector, adapter) = CreateConnector(canFilterBySecurity: false, supportDepth: true);
439+
await connector.ConnectAsync(CancellationToken);
440+
441+
// 1. Subscribe to AAPL MarketDepth — adapter can't filter, so system uses SecurityAll internally
442+
var aaplSub = new Subscription(DataType.MarketDepth, new Security { Id = AaplId.ToStringId() });
443+
var receivedBooks = new List<(Subscription sub, IOrderBookMessage book)>();
444+
connector.OrderBookReceived += (s, b) => receivedBooks.Add((s, b));
445+
446+
var aaplStarted = AsyncHelper.CreateTaskCompletionSource<bool>();
447+
connector.SubscriptionStarted += s => { if (ReferenceEquals(s, aaplSub)) aaplStarted.TrySetResult(true); };
448+
449+
using var aaplCts = new CancellationTokenSource();
450+
var aaplRun = connector.SubscribeAsync(aaplSub, aaplCts.Token).AsTask();
451+
await aaplStarted.Task.WithCancellation(CancellationToken);
452+
453+
var subId = await WaitForSubscription(adapter, CancellationToken);
454+
var now = DateTime.UtcNow;
455+
456+
// 2. Adapter sends full order books for ALL instruments (it can't filter)
457+
await adapter.EmitOrderBook(subId, AaplId, now,
458+
bids: [new(150m, 10), new(149m, 20)],
459+
asks: [new(151m, 15), new(152m, 25)], CancellationToken);
460+
461+
await adapter.EmitOrderBook(subId, GoogId, now.AddMilliseconds(1),
462+
bids: [new(2800m, 5), new(2799m, 8)],
463+
asks: [new(2801m, 3), new(2802m, 7)], CancellationToken);
464+
465+
await Task.Delay(300, CancellationToken);
466+
467+
// Verify AAPL books received
468+
IsTrue(receivedBooks.Any(b => ((ISecurityIdMessage)b.book).SecurityId == AaplId), "AAPL book should arrive");
469+
470+
receivedBooks.Clear();
471+
472+
// 3. Now subscribe to GOOG MarketDepth — snapshot already passed, no new data from adapter
473+
var googSub = new Subscription(DataType.MarketDepth, new Security { Id = GoogId.ToStringId() });
474+
475+
var googStarted = AsyncHelper.CreateTaskCompletionSource<bool>();
476+
connector.SubscriptionStarted += s => { if (ReferenceEquals(s, googSub)) googStarted.TrySetResult(true); };
477+
478+
using var googCts = new CancellationTokenSource();
479+
var googRun = connector.SubscribeAsync(googSub, googCts.Token).AsTask();
480+
481+
// Wait — no new data emitted by adapter after this point
482+
await Task.Delay(500, CancellationToken);
483+
484+
// 4. Late subscriber should receive GOOG's cached order book
485+
var googBooks = receivedBooks
486+
.Where(b => ((ISecurityIdMessage)b.book).SecurityId == GoogId)
487+
.ToList();
488+
489+
IsTrue(googBooks.Count >= 1, $"Late GOOG subscriber should receive cached order book, got {googBooks.Count}");
490+
491+
// Verify the cached book has correct data
492+
var book = googBooks[0].book;
493+
var bids = book.Bids.ToArray();
494+
var asks = book.Asks.ToArray();
495+
496+
IsTrue(bids.Length >= 1, "GOOG book should have bids");
497+
IsTrue(asks.Length >= 1, "GOOG book should have asks");
498+
499+
aaplCts.Cancel();
500+
googCts.Cancel();
501+
await aaplRun.WithCancellation(CancellationToken);
502+
await googRun.WithCancellation(CancellationToken);
503+
}
504+
505+
#endregion
363506
}

0 commit comments

Comments
 (0)