diff --git a/tests/ModelContextProtocol.Tests/Server/DistributedCacheEventStreamStoreTests.cs b/tests/ModelContextProtocol.Tests/Server/DistributedCacheEventStreamStoreTests.cs index 0983e6ad9..f306064f8 100644 --- a/tests/ModelContextProtocol.Tests/Server/DistributedCacheEventStreamStoreTests.cs +++ b/tests/ModelContextProtocol.Tests/Server/DistributedCacheEventStreamStoreTests.cs @@ -904,10 +904,16 @@ public async Task ReadEventsAsync_InStreamingMode_YieldsNewlyWrittenEvents() using var cts = CancellationTokenSource.CreateLinkedTokenSource(CancellationToken); cts.CancelAfter(TimeSpan.FromSeconds(10)); var events = new List>(); + + // Use a TCS as a sync point: set when the reader has confirmed receipt of the first event. + // This guarantees the streaming enumerator is definitely active before we write events 2 and 3. + var readerStarted = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + var readTask = Task.Run(async () => { await foreach (var evt in reader.ReadEventsAsync(cts.Token)) { + readerStarted.TrySetResult(true); events.Add(evt); if (events.Count >= 3) { @@ -916,8 +922,13 @@ public async Task ReadEventsAsync_InStreamingMode_YieldsNewlyWrittenEvents() } }, CancellationToken); - // Write 3 new events - the reader should pick them up since it's in streaming mode + // Write the first event and wait for the reader to confirm it has been received. + // This establishes a synchronization point: once readerStarted is signalled, we know + // the streaming loop is running and will reliably observe any subsequently written events. var event1 = await writer.WriteEventAsync(new SseItem(null), CancellationToken); + await readerStarted.Task.WaitAsync(cts.Token); + + // Write the remaining 2 events now that the reader is confirmed active var event2 = await writer.WriteEventAsync(new SseItem(null), CancellationToken); var event3 = await writer.WriteEventAsync(new SseItem(null), CancellationToken);