22using PerfProblemSimulator . Hubs ;
33using PerfProblemSimulator . Models ;
44using PerfProblemSimulator . Services ;
5+ using System . Collections . Concurrent ;
56
67namespace PerfProblemSimulator . Services ;
78
@@ -17,21 +18,32 @@ namespace PerfProblemSimulator.Services;
1718/// It subscribes to metrics events and broadcasts them to all connected clients.
1819/// </para>
1920/// <para>
20- /// <strong>Why a separate service?</strong>
21+ /// <strong>Thread Pool Independence (Critical for Load Testing):</strong>
22+ /// </para>
23+ /// <para>
24+ /// When the load test endpoint exhausts the thread pool, SignalR broadcasts would
25+ /// normally freeze because they rely on thread pool threads. To prevent this, we use:
2126/// </para>
2227/// <list type="bullet">
23- /// <item>MetricsCollector is focused on data collection (single responsibility)</item>
24- /// <item>SignalR hub context can be injected here safely</item>
25- /// <item>Allows metrics collection to work even with no connected clients</item>
26- /// <item>Graceful startup/shutdown via IHostedService lifecycle</item>
28+ /// <item>A dedicated broadcast thread (not from thread pool)</item>
29+ /// <item>A message queue (BlockingCollection) for thread-safe message passing</item>
30+ /// <item>Fire-and-forget semantics that don't await thread pool continuations</item>
2731/// </list>
32+ /// <para>
33+ /// This ensures the dashboard continues updating even during severe thread pool starvation.
34+ /// </para>
2835/// </remarks>
2936public class MetricsBroadcastService : IHostedService
3037{
3138 private readonly IMetricsCollector _metricsCollector ;
3239 private readonly ISimulationTracker _simulationTracker ;
3340 private readonly IHubContext < MetricsHub , IMetricsClient > _hubContext ;
3441 private readonly ILogger < MetricsBroadcastService > _logger ;
42+
43+ // Message queue for thread-pool-independent broadcasting
44+ private readonly BlockingCollection < BroadcastMessage > _messageQueue = new ( boundedCapacity : 100 ) ;
45+ private Thread ? _broadcastThread ;
46+ private volatile bool _running ;
3547
3648 /// <summary>
3749 /// Initializes a new instance of the <see cref="MetricsBroadcastService"/> class.
@@ -51,62 +63,155 @@ public MetricsBroadcastService(
5163 /// <inheritdoc />
5264 public Task StartAsync ( CancellationToken cancellationToken )
5365 {
66+ _running = true ;
67+
68+ // Start dedicated broadcast thread (not from thread pool)
69+ _broadcastThread = new Thread ( BroadcastLoop )
70+ {
71+ Name = "SignalR-Broadcast" ,
72+ IsBackground = true ,
73+ Priority = ThreadPriority . AboveNormal // Prioritize dashboard updates
74+ } ;
75+ _broadcastThread . Start ( ) ;
76+
5477 _metricsCollector . MetricsCollected += OnMetricsCollected ;
5578 _simulationTracker . SimulationStarted += OnSimulationStarted ;
5679 _simulationTracker . SimulationCompleted += OnSimulationCompleted ;
5780 _metricsCollector . Start ( ) ;
5881
59- _logger . LogInformation ( "Metrics broadcast service started" ) ;
82+ _logger . LogInformation ( "Metrics broadcast service started with dedicated broadcast thread " ) ;
6083 return Task . CompletedTask ;
6184 }
6285
6386 /// <inheritdoc />
6487 public Task StopAsync ( CancellationToken cancellationToken )
6588 {
89+ _running = false ;
90+ _messageQueue . CompleteAdding ( ) ;
91+
6692 _metricsCollector . MetricsCollected -= OnMetricsCollected ;
6793 _simulationTracker . SimulationStarted -= OnSimulationStarted ;
6894 _simulationTracker . SimulationCompleted -= OnSimulationCompleted ;
6995 _metricsCollector . Stop ( ) ;
96+
97+ // Wait for broadcast thread to finish (with timeout)
98+ _broadcastThread ? . Join ( TimeSpan . FromSeconds ( 5 ) ) ;
7099
71100 _logger . LogInformation ( "Metrics broadcast service stopped" ) ;
72101 return Task . CompletedTask ;
73102 }
74-
75- private async void OnMetricsCollected ( object ? sender , MetricsSnapshot snapshot )
103+
104+ /// <summary>
105+ /// Dedicated thread loop that processes broadcast messages.
106+ /// This runs independently of the thread pool.
107+ /// </summary>
108+ private void BroadcastLoop ( )
76109 {
77- try
78- {
79- await _hubContext . Clients . All . ReceiveMetrics ( snapshot ) ;
80- }
81- catch ( Exception ex )
110+ _logger . LogDebug ( "Broadcast thread started" ) ;
111+
112+ while ( _running || _messageQueue . Count > 0 )
82113 {
83- _logger . LogError ( ex , "Error broadcasting metrics to clients" ) ;
114+ try
115+ {
116+ // TryTake with timeout to allow checking _running flag
117+ if ( _messageQueue . TryTake ( out var message , TimeSpan . FromMilliseconds ( 100 ) ) )
118+ {
119+ ProcessMessage ( message ) ;
120+ }
121+ }
122+ catch ( InvalidOperationException )
123+ {
124+ // Collection was marked as complete - exit loop
125+ break ;
126+ }
127+ catch ( Exception ex )
128+ {
129+ _logger . LogError ( ex , "Error in broadcast loop" ) ;
130+ }
84131 }
132+
133+ _logger . LogDebug ( "Broadcast thread exiting" ) ;
85134 }
86-
87- private async void OnSimulationStarted ( object ? sender , SimulationEventArgs e )
135+
136+ /// <summary>
137+ /// Process a single broadcast message.
138+ /// Uses GetAwaiter().GetResult() to avoid thread pool dependency.
139+ /// </summary>
140+ private void ProcessMessage ( BroadcastMessage message )
88141 {
89142 try
90143 {
91- await _hubContext . Clients . All . SimulationStarted ( e . Type . ToString ( ) , e . SimulationId ) ;
92- _logger . LogDebug ( "Broadcast SimulationStarted: {Type} {Id}" , e . Type , e . SimulationId ) ;
144+ switch ( message . Type )
145+ {
146+ case BroadcastType . Metrics :
147+ _hubContext . Clients . All . ReceiveMetrics ( ( MetricsSnapshot ) message . Data ! ) . GetAwaiter ( ) . GetResult ( ) ;
148+ break ;
149+
150+ case BroadcastType . SimulationStarted :
151+ var startArgs = ( SimulationEventArgs ) message . Data ! ;
152+ _hubContext . Clients . All . SimulationStarted ( startArgs . Type . ToString ( ) , startArgs . SimulationId ) . GetAwaiter ( ) . GetResult ( ) ;
153+ _logger . LogDebug ( "Broadcast SimulationStarted: {Type} {Id}" , startArgs . Type , startArgs . SimulationId ) ;
154+ break ;
155+
156+ case BroadcastType . SimulationCompleted :
157+ var completeArgs = ( SimulationEventArgs ) message . Data ! ;
158+ _hubContext . Clients . All . SimulationCompleted ( completeArgs . Type . ToString ( ) , completeArgs . SimulationId ) . GetAwaiter ( ) . GetResult ( ) ;
159+ _logger . LogDebug ( "Broadcast SimulationCompleted: {Type} {Id}" , completeArgs . Type , completeArgs . SimulationId ) ;
160+ break ;
161+
162+ case BroadcastType . Latency :
163+ _hubContext . Clients . All . ReceiveLatency ( ( LatencyMeasurement ) message . Data ! ) . GetAwaiter ( ) . GetResult ( ) ;
164+ break ;
165+
166+ case BroadcastType . SlowRequestLatency :
167+ _hubContext . Clients . All . ReceiveSlowRequestLatency ( ( SlowRequestLatencyData ) message . Data ! ) . GetAwaiter ( ) . GetResult ( ) ;
168+ break ;
169+
170+ case BroadcastType . LoadTestStats :
171+ _hubContext . Clients . All . ReceiveLoadTestStats ( ( LoadTestStatsData ) message . Data ! ) . GetAwaiter ( ) . GetResult ( ) ;
172+ break ;
173+ }
93174 }
94175 catch ( Exception ex )
95176 {
96- _logger . LogError ( ex , "Error broadcasting simulation started event" ) ;
177+ _logger . LogWarning ( ex , "Error broadcasting {Type} message" , message . Type ) ;
97178 }
98179 }
99180
100- private async void OnSimulationCompleted ( object ? sender , SimulationEventArgs e )
181+ private void OnMetricsCollected ( object ? sender , MetricsSnapshot snapshot )
101182 {
102- try
183+ // Queue message - don't block if queue is full (drop oldest metrics)
184+ if ( ! _messageQueue . TryAdd ( new BroadcastMessage ( BroadcastType . Metrics , snapshot ) ) )
103185 {
104- await _hubContext . Clients . All . SimulationCompleted ( e . Type . ToString ( ) , e . SimulationId ) ;
105- _logger . LogDebug ( "Broadcast SimulationCompleted: {Type} {Id}" , e . Type , e . SimulationId ) ;
106- }
107- catch ( Exception ex )
108- {
109- _logger . LogError ( ex , "Error broadcasting simulation completed event" ) ;
186+ _logger . LogTrace ( "Broadcast queue full, dropping metrics update" ) ;
110187 }
111188 }
189+
190+ private void OnSimulationStarted ( object ? sender , SimulationEventArgs e )
191+ {
192+ _messageQueue . TryAdd ( new BroadcastMessage ( BroadcastType . SimulationStarted , e ) ) ;
193+ }
194+
195+ private void OnSimulationCompleted ( object ? sender , SimulationEventArgs e )
196+ {
197+ _messageQueue . TryAdd ( new BroadcastMessage ( BroadcastType . SimulationCompleted , e ) ) ;
198+ }
199+
200+ /// <summary>
201+ /// Message types for the broadcast queue.
202+ /// </summary>
203+ private enum BroadcastType
204+ {
205+ Metrics ,
206+ SimulationStarted ,
207+ SimulationCompleted ,
208+ Latency ,
209+ SlowRequestLatency ,
210+ LoadTestStats
211+ }
212+
213+ /// <summary>
214+ /// Wrapper for broadcast messages in the queue.
215+ /// </summary>
216+ private record BroadcastMessage ( BroadcastType Type , object ? Data ) ;
112217}
0 commit comments