Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 16 additions & 20 deletions src/Paramore.Brighter/PipelineBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -208,12 +208,11 @@ private IHandleRequests<TRequest> BuildPipeline(RequestHandler<TRequest> implici
.Where(attribute => attribute.Timing == HandlerTiming.Before)
.OrderByDescending(attribute => attribute.Step);

AddGlobalInboxAttributes(ref preAttributes, implicitHandler);

s_preAttributesMemento.TryAdd(implicitHandler.Name.ToString(), preAttributes);

}

AddGlobalInboxAttributes(ref preAttributes, implicitHandler);

var firstInPipeline = PushOntoPipeline(preAttributes, implicitHandler, requestContext, instanceScope);


Expand Down Expand Up @@ -250,15 +249,11 @@ private IHandleRequestsAsync<TRequest> BuildAsyncPipeline(RequestHandlerAsync<TR
.Where(attribute => attribute.Timing == HandlerTiming.Before)
.OrderByDescending(attribute => attribute.Step);

AddGlobalInboxAttributesAsync(ref preAttributes, implicitHandler);

s_preAttributesMemento.TryAdd(implicitHandler.Name.ToString(), preAttributes);

}


AddGlobalInboxAttributesAsync(ref preAttributes, implicitHandler);

var firstInPipeline = PushOntoAsyncPipeline(preAttributes, implicitHandler, requestContext, instanceScope, continueOnCapturedContext);

if (!s_postAttributesMemento.TryGetValue(implicitHandler.Name.ToString(), out IOrderedEnumerable<RequestHandlerAttribute>? postAttributes))
Expand All @@ -278,14 +273,12 @@ private IHandleRequestsAsync<TRequest> BuildAsyncPipeline(RequestHandlerAsync<TR
private void AddGlobalInboxAttributes(ref IOrderedEnumerable<RequestHandlerAttribute> preAttributes, RequestHandler<TRequest> implicitHandler)
{
if (
_inboxConfiguration == null
_inboxConfiguration == null
|| implicitHandler.FindHandlerMethod().HasNoInboxAttributesInPipeline()
|| implicitHandler.FindHandlerMethod().HasExistingUseInboxAttributesInPipeline()
)
return;

if (_inboxConfiguration is null)
throw new ArgumentException("Inbox Configuration must be provided");
if (_inboxConfiguration.Context is null)
throw new ArgumentException("Inbox Configuration must be set");
var useInboxAttribute = new UseInboxAttribute(
Expand All @@ -294,22 +287,18 @@ private void AddGlobalInboxAttributes(ref IOrderedEnumerable<RequestHandlerAttri
onceOnly: _inboxConfiguration.OnceOnly,
timing: HandlerTiming.Before,
onceOnlyAction: _inboxConfiguration.ActionOnExists);

PushOntoAttributeList(ref preAttributes, useInboxAttribute);
}


private void AddGlobalInboxAttributesAsync(ref IOrderedEnumerable<RequestHandlerAttribute> preAttributes, RequestHandlerAsync<TRequest> implicitHandler)
{
if (_inboxConfiguration == null
if (_inboxConfiguration == null
|| implicitHandler.FindHandlerMethod().HasNoInboxAttributesInPipeline()
|| implicitHandler.FindHandlerMethod().HasExistingUseInboxAttributesInPipeline()

)
return;

if (_inboxConfiguration is null)
throw new ArgumentException("Inbox Configuration must be provided");
if (_inboxConfiguration.Context is null)
throw new ArgumentException("Inbox Configuration must be set");
var useInboxAttribute = new UseInboxAsyncAttribute(
Expand Down Expand Up @@ -372,15 +361,22 @@ private void AppendToAsyncPipeline(IEnumerable<RequestHandlerAttribute> attribut
private static void PushOntoAttributeList(ref IOrderedEnumerable<RequestHandlerAttribute> preAttributes, RequestHandlerAttribute requestHandlerAttribute)
{
var attributeList = new List<RequestHandlerAttribute>();

attributeList.Add(requestHandlerAttribute);
var minStep = int.MaxValue;

preAttributes.Each(handler =>
{
handler.Step++;
if (handler.Step < minStep)
minStep = handler.Step;
attributeList.Add(handler);
});

// Ensure the new attribute has a lower step than all existing attributes
// so it is processed last in the descending iteration (outermost in the pipeline).
// Note: requestHandlerAttribute must be a freshly created instance on every call —
// callers must not cache or reuse it, as we mutate its Step here.
requestHandlerAttribute.Step = minStep == int.MaxValue ? 0 : minStep - 1;
attributeList.Add(requestHandlerAttribute);

preAttributes = attributeList.OrderByDescending(handler => handler.Step);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,187 @@
using System.Collections.Generic;
using System.Linq;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Time.Testing;
using Paramore.Brighter.Core.Tests.CommandProcessors.TestDoubles;
using Paramore.Brighter.Extensions.DependencyInjection;
using Paramore.Brighter.Inbox.Handlers;
using Xunit;

namespace Paramore.Brighter.Core.Tests.CommandProcessors.Pipeline;

public class When_Building_A_Pipeline_Inbox_Cache_Does_Not_Leak_Across_Configurations
{
[Fact]
public void When_Building_A_Pipeline_With_Inbox_Then_Without_Inbox_No_Leakage()
{
PipelineBuilder<MyCommand>.ClearPipelineCache();

// Step 1: Build a pipeline WITH inbox configuration — populates the static cache
var inboxRegistry = new SubscriberRegistry();
inboxRegistry.Register<MyCommand, MyCommandHandler>();

var inboxContainer = new ServiceCollection();
inboxContainer.AddTransient<MyCommandHandler>(_ => new MyCommandHandler(new Dictionary<string, string>()));
inboxContainer.AddSingleton<IAmAnInboxSync>(new InMemoryInbox(new FakeTimeProvider()));
inboxContainer.AddTransient<UseInboxHandler<MyCommand>>();
inboxContainer.AddSingleton<IBrighterOptions>(new BrighterOptions { HandlerLifetime = ServiceLifetime.Transient });

var inboxHandlerFactory = new ServiceProviderHandlerFactory(inboxContainer.BuildServiceProvider());
var inboxBuilder = new PipelineBuilder<MyCommand>(
inboxRegistry, (IAmAHandlerFactorySync)inboxHandlerFactory, new InboxConfiguration());

var withInbox = inboxBuilder.Build(new MyCommand(), new RequestContext());
var withInboxTrace = TracePipeline(withInbox.First());
Assert.Contains("UseInboxHandler`", withInboxTrace);

// Step 2: Build a pipeline WITHOUT inbox configuration — same handler type, sharing the static cache
var noInboxRegistry = new SubscriberRegistry();
noInboxRegistry.Register<MyCommand, MyCommandHandler>();

var noInboxContainer = new ServiceCollection();
noInboxContainer.AddTransient<MyCommandHandler>(_ => new MyCommandHandler(new Dictionary<string, string>()));
noInboxContainer.AddSingleton<IBrighterOptions>(new BrighterOptions { HandlerLifetime = ServiceLifetime.Transient });

var noInboxHandlerFactory = new ServiceProviderHandlerFactory(noInboxContainer.BuildServiceProvider());
var noInboxBuilder = new PipelineBuilder<MyCommand>(
noInboxRegistry, (IAmAHandlerFactorySync)noInboxHandlerFactory);

var withoutInbox = noInboxBuilder.Build(new MyCommand(), new RequestContext());
var withoutInboxTrace = TracePipeline(withoutInbox.First());
Assert.DoesNotContain("UseInboxHandler`", withoutInboxTrace);
}

[Fact]
public void When_Building_A_Pipeline_Without_Inbox_Then_With_Inbox_Still_Gets_Inbox()
{
PipelineBuilder<MyCommand>.ClearPipelineCache();

// Step 1: Build a pipeline WITHOUT inbox configuration — primes the static cache
var noInboxRegistry = new SubscriberRegistry();
noInboxRegistry.Register<MyCommand, MyCommandHandler>();

var noInboxContainer = new ServiceCollection();
noInboxContainer.AddTransient<MyCommandHandler>(_ => new MyCommandHandler(new Dictionary<string, string>()));
noInboxContainer.AddSingleton<IBrighterOptions>(new BrighterOptions { HandlerLifetime = ServiceLifetime.Transient });

var noInboxHandlerFactory = new ServiceProviderHandlerFactory(noInboxContainer.BuildServiceProvider());
var noInboxBuilder = new PipelineBuilder<MyCommand>(
noInboxRegistry, (IAmAHandlerFactorySync)noInboxHandlerFactory);

var withoutInbox = noInboxBuilder.Build(new MyCommand(), new RequestContext());
var withoutInboxTrace = TracePipeline(withoutInbox.First());
Assert.DoesNotContain("UseInboxHandler`", withoutInboxTrace);

// Step 2: Build a pipeline WITH inbox configuration — cache already primed without inbox
var inboxRegistry = new SubscriberRegistry();
inboxRegistry.Register<MyCommand, MyCommandHandler>();

var inboxContainer = new ServiceCollection();
inboxContainer.AddTransient<MyCommandHandler>(_ => new MyCommandHandler(new Dictionary<string, string>()));
inboxContainer.AddSingleton<IAmAnInboxSync>(new InMemoryInbox(new FakeTimeProvider()));
inboxContainer.AddTransient<UseInboxHandler<MyCommand>>();
inboxContainer.AddSingleton<IBrighterOptions>(new BrighterOptions { HandlerLifetime = ServiceLifetime.Transient });

var inboxHandlerFactory = new ServiceProviderHandlerFactory(inboxContainer.BuildServiceProvider());
var inboxBuilder = new PipelineBuilder<MyCommand>(
inboxRegistry, (IAmAHandlerFactorySync)inboxHandlerFactory, new InboxConfiguration());

var withInbox = inboxBuilder.Build(new MyCommand(), new RequestContext());
var withInboxTrace = TracePipeline(withInbox.First());
Assert.Contains("UseInboxHandler`", withInboxTrace);
}

[Fact]
public void When_Building_An_Async_Pipeline_With_Inbox_Then_Without_Inbox_No_Leakage()
{
PipelineBuilder<MyCommand>.ClearPipelineCache();

// Step 1: Build an async pipeline WITH inbox configuration — populates the static cache
var inboxRegistry = new SubscriberRegistry();
inboxRegistry.RegisterAsync<MyCommand, MyCommandHandlerAsync>();

var inboxContainer = new ServiceCollection();
inboxContainer.AddSingleton(new MyCommandHandlerAsync(new Dictionary<string, string>()));
inboxContainer.AddSingleton<IAmAnInboxAsync>(new InMemoryInbox(new FakeTimeProvider()));
inboxContainer.AddTransient<UseInboxHandlerAsync<MyCommand>>();
inboxContainer.AddSingleton<IBrighterOptions>(new BrighterOptions { HandlerLifetime = ServiceLifetime.Transient });

var inboxHandlerFactory = new ServiceProviderHandlerFactory(inboxContainer.BuildServiceProvider());
var inboxBuilder = new PipelineBuilder<MyCommand>(
inboxRegistry, (IAmAHandlerFactoryAsync)inboxHandlerFactory, new InboxConfiguration());

var withInbox = inboxBuilder.BuildAsync(new MyCommand(), new RequestContext(), false);
var withInboxTrace = TraceAsyncPipeline(withInbox.First());
Assert.Contains("UseInboxHandlerAsync`", withInboxTrace);

// Step 2: Build an async pipeline WITHOUT inbox configuration — same handler type, sharing the static cache
var noInboxRegistry = new SubscriberRegistry();
noInboxRegistry.RegisterAsync<MyCommand, MyCommandHandlerAsync>();

var noInboxContainer = new ServiceCollection();
noInboxContainer.AddSingleton(new MyCommandHandlerAsync(new Dictionary<string, string>()));
noInboxContainer.AddSingleton<IBrighterOptions>(new BrighterOptions { HandlerLifetime = ServiceLifetime.Transient });

var noInboxHandlerFactory = new ServiceProviderHandlerFactory(noInboxContainer.BuildServiceProvider());
var noInboxBuilder = new PipelineBuilder<MyCommand>(
noInboxRegistry, (IAmAHandlerFactoryAsync)noInboxHandlerFactory);

var withoutInbox = noInboxBuilder.BuildAsync(new MyCommand(), new RequestContext(), false);
var withoutInboxTrace = TraceAsyncPipeline(withoutInbox.First());
Assert.DoesNotContain("UseInboxHandlerAsync`", withoutInboxTrace);
}

[Fact]
public void When_Building_An_Async_Pipeline_Without_Inbox_Then_With_Inbox_Still_Gets_Inbox()
{
PipelineBuilder<MyCommand>.ClearPipelineCache();

// Step 1: Build an async pipeline WITHOUT inbox configuration — primes the static cache
var noInboxRegistry = new SubscriberRegistry();
noInboxRegistry.RegisterAsync<MyCommand, MyCommandHandlerAsync>();

var noInboxContainer = new ServiceCollection();
noInboxContainer.AddSingleton(new MyCommandHandlerAsync(new Dictionary<string, string>()));
noInboxContainer.AddSingleton<IBrighterOptions>(new BrighterOptions { HandlerLifetime = ServiceLifetime.Transient });

var noInboxHandlerFactory = new ServiceProviderHandlerFactory(noInboxContainer.BuildServiceProvider());
var noInboxBuilder = new PipelineBuilder<MyCommand>(
noInboxRegistry, (IAmAHandlerFactoryAsync)noInboxHandlerFactory);

var withoutInbox = noInboxBuilder.BuildAsync(new MyCommand(), new RequestContext(), false);
var withoutInboxTrace = TraceAsyncPipeline(withoutInbox.First());
Assert.DoesNotContain("UseInboxHandlerAsync`", withoutInboxTrace);

// Step 2: Build an async pipeline WITH inbox configuration — cache already primed without inbox
var inboxRegistry = new SubscriberRegistry();
inboxRegistry.RegisterAsync<MyCommand, MyCommandHandlerAsync>();

var inboxContainer = new ServiceCollection();
inboxContainer.AddSingleton(new MyCommandHandlerAsync(new Dictionary<string, string>()));
inboxContainer.AddSingleton<IAmAnInboxAsync>(new InMemoryInbox(new FakeTimeProvider()));
inboxContainer.AddTransient<UseInboxHandlerAsync<MyCommand>>();
inboxContainer.AddSingleton<IBrighterOptions>(new BrighterOptions { HandlerLifetime = ServiceLifetime.Transient });

var inboxHandlerFactory = new ServiceProviderHandlerFactory(inboxContainer.BuildServiceProvider());
var inboxBuilder = new PipelineBuilder<MyCommand>(
inboxRegistry, (IAmAHandlerFactoryAsync)inboxHandlerFactory, new InboxConfiguration());

var withInbox = inboxBuilder.BuildAsync(new MyCommand(), new RequestContext(), false);
var withInboxTrace = TraceAsyncPipeline(withInbox.First());
Assert.Contains("UseInboxHandlerAsync`", withInboxTrace);
}

private static string TracePipeline(IHandleRequests<MyCommand> firstInPipeline)
{
var pipelineTracer = new PipelineTracer();
firstInPipeline.DescribePath(pipelineTracer);
return pipelineTracer.ToString();
}

private static string TraceAsyncPipeline(IHandleRequestsAsync<MyCommand> firstInPipeline)
{
var pipelineTracer = new PipelineTracer();
firstInPipeline.DescribePath(pipelineTracer);
return pipelineTracer.ToString();
}
}
Loading