11using FluentAssertions ;
22using Microsoft . Extensions . DependencyInjection ;
33using Microsoft . Extensions . Logging ;
4+ using Microsoft . Extensions . Logging . Abstractions ;
45using Xunit ;
56using Xunit . Abstractions ;
67
@@ -238,11 +239,47 @@ public void CanRejectLargeMessages()
238239 p . TryEnqueue ( ByteArray50 ) . Should ( ) . BeFalse ( ) ; // failed here
239240 }
240241
242+ [ Fact ]
243+ [ TestBeforeAfter ]
244+ public void CanRecoverIfPublisherCrashes ( )
245+ {
246+ // This is very complicated test that is trying to replicate a crash scenario when the publisher
247+ // crashes after indicating that it is writing the message but before completing the operation.
248+
249+ using var dp = new DeadlockCausingPublisher ( new ( "qn" , fixture . Path , 1024 ) , NullLoggerFactory . Instance ) ;
250+ dp . TryEnqueue ( ByteArray3 ) . Should ( ) . BeTrue ( ) ;
251+
252+ using var p = CreatePublisher ( 1024 ) ;
253+ p . TryEnqueue ( ByteArray1 ) . Should ( ) . BeTrue ( ) ;
254+ using var s = CreateSubscriber ( 1024 ) ;
255+
256+ // This line should take 10 seconds to return (that is how long the timeout is set in the code)
257+ // After the 10 seconds expires, we should have lost all other messages that were in the queue when we started the dequeue process.
258+ s . TryDequeue ( default , out _ ) . Should ( ) . BeFalse ( ) ;
259+
260+ // But then, after this 10 seconds delay, system should fully recover and continue with new messages
261+ p . TryEnqueue ( ByteArray1 ) . Should ( ) . BeTrue ( ) ;
262+ s . TryDequeue ( default , out var message ) . Should ( ) . BeTrue ( ) ;
263+ message . ToArray ( ) . Should ( ) . BeEquivalentTo ( ByteArray1 ) ;
264+ }
265+
241266 private IPublisher CreatePublisher ( long capacity ) =>
242- queueFactory . CreatePublisher (
243- new QueueOptions ( "qn" , fixture . Path , capacity ) ) ;
267+ queueFactory . CreatePublisher ( new ( "qn" , fixture . Path , capacity ) ) ;
244268
245269 private ISubscriber CreateSubscriber ( long capacity ) =>
246- queueFactory . CreateSubscriber (
247- new QueueOptions ( "qn" , fixture . Path , capacity ) ) ;
270+ queueFactory . CreateSubscriber ( new ( "qn" , fixture . Path , capacity ) ) ;
271+
272+ private sealed class DeadlockCausingPublisher ( QueueOptions options , ILoggerFactory loggerFactory ) :
273+ Queue ( options , loggerFactory ) ,
274+ IPublisher
275+ {
276+ public unsafe bool TryEnqueue ( ReadOnlySpan < byte > message )
277+ {
278+ var bodyLength = message . Length ;
279+ var messageLength = GetPaddedMessageLength ( bodyLength ) ;
280+ var header = * Header ;
281+ Header ->WriteOffset = SafeIncrementMessageOffset ( header . WriteOffset , messageLength ) ;
282+ return true ;
283+ }
284+ }
248285}
0 commit comments