diff --git a/ManagedCode.Communication.AspNetCore/Commands/Extensions/CommandIdempotencyExtensions.cs b/ManagedCode.Communication.AspNetCore/Commands/Extensions/CommandIdempotencyExtensions.cs index 1903c55..206c3c6 100644 --- a/ManagedCode.Communication.AspNetCore/Commands/Extensions/CommandIdempotencyExtensions.cs +++ b/ManagedCode.Communication.AspNetCore/Commands/Extensions/CommandIdempotencyExtensions.cs @@ -34,57 +34,50 @@ public static async Task ExecuteIdempotentAsync( CommandMetadata? metadata, CancellationToken cancellationToken = default) { - // Fast path: check for existing completed result - var existingResult = await store.GetCommandResultAsync(commandId, cancellationToken); - if (existingResult != null) + while (true) { - return existingResult; - } + cancellationToken.ThrowIfCancellationRequested(); - // Atomically try to claim the command for execution - var (currentStatus, wasSet) = await store.GetAndSetStatusAsync( - commandId, - CommandExecutionStatus.InProgress, - cancellationToken); + var currentStatus = await store.GetCommandStatusAsync(commandId, cancellationToken); - switch (currentStatus) - { - case CommandExecutionStatus.Completed: - // Result exists but might have been evicted, get it again - existingResult = await store.GetCommandResultAsync(commandId, cancellationToken); - return existingResult ?? throw new InvalidOperationException($"Command {commandId} marked as completed but result not found"); - - case CommandExecutionStatus.InProgress: - case CommandExecutionStatus.Processing: - // Another thread is executing, wait for completion - return await WaitForCompletionAsync(store, commandId, cancellationToken); - - case CommandExecutionStatus.Failed: - // Previous execution failed, we can retry (wasSet should be true) - if (!wasSet) + switch (currentStatus) + { + case CommandExecutionStatus.Completed: { - // Race condition - another thread claimed it - return await WaitForCompletionAsync(store, commandId, cancellationToken); + var cachedResult = await store.GetCommandResultAsync(commandId, cancellationToken); + return cachedResult ?? default!; } - break; - - case CommandExecutionStatus.NotFound: - case CommandExecutionStatus.NotStarted: - default: - // First execution (wasSet should be true) - if (!wasSet) - { - // Race condition - another thread claimed it + + case CommandExecutionStatus.InProgress: + case CommandExecutionStatus.Processing: return await WaitForCompletionAsync(store, commandId, cancellationToken); + + case CommandExecutionStatus.NotFound: + case CommandExecutionStatus.NotStarted: + case CommandExecutionStatus.Failed: + default: + { + var claimed = await store.TrySetCommandStatusAsync( + commandId, + currentStatus, + CommandExecutionStatus.InProgress, + cancellationToken); + + if (claimed) + { + goto ExecuteOperation; + } + + break; } - break; + } } - // We successfully claimed the command for execution + ExecuteOperation: try { var result = await operation(); - + // Store result and mark as completed atomically await store.SetCommandResultAsync(commandId, result, cancellationToken); await store.SetCommandStatusAsync(commandId, CommandExecutionStatus.Completed, cancellationToken); @@ -159,7 +152,7 @@ public static async Task ExecuteIdempotentWithRetryAsync( if (status == CommandExecutionStatus.Completed) { var result = await store.GetCommandResultAsync(commandId, cancellationToken); - return (result != null, result); + return (true, result); } return (false, default); @@ -192,7 +185,7 @@ public static async Task> ExecuteBatchIdempotentAsync( var operationsList = operations.ToList(); var commandIds = operationsList.Select(op => op.commandId).ToList(); - // Check for existing results in batch + var existingStatuses = await store.GetMultipleStatusAsync(commandIds, cancellationToken); var existingResults = await store.GetMultipleResultsAsync(commandIds, cancellationToken); var results = new Dictionary(); var pendingOperations = new List<(string commandId, Func> operation)>(); @@ -200,9 +193,10 @@ public static async Task> ExecuteBatchIdempotentAsync( // Separate completed from pending foreach (var (commandId, operation) in operationsList) { - if (existingResults.TryGetValue(commandId, out var existingResult) && existingResult != null) + if (existingStatuses.TryGetValue(commandId, out var status) && status == CommandExecutionStatus.Completed) { - results[commandId] = existingResult; + existingResults.TryGetValue(commandId, out var existingResult); + results[commandId] = existingResult ?? default!; } else { @@ -255,7 +249,7 @@ private static async Task WaitForCompletionAsync( { case CommandExecutionStatus.Completed: var result = await store.GetCommandResultAsync(commandId, cancellationToken); - return result ?? throw new InvalidOperationException($"Command {commandId} completed but result not found"); + return result ?? default!; case CommandExecutionStatus.Failed: throw new InvalidOperationException($"Command {commandId} failed during execution"); diff --git a/ManagedCode.Communication.Orleans/Grains/CommandIdempotencyGrain.cs b/ManagedCode.Communication.Orleans/Grains/CommandIdempotencyGrain.cs index 66d6284..5e846b6 100644 --- a/ManagedCode.Communication.Orleans/Grains/CommandIdempotencyGrain.cs +++ b/ManagedCode.Communication.Orleans/Grains/CommandIdempotencyGrain.cs @@ -26,8 +26,21 @@ public Task GetStatusAsync() public async Task TryStartProcessingAsync() { - // Check if already processing or completed - if (state.State.Status != CommandExecutionStatus.NotFound) + // Reject concurrent executions + if (state.State.Status is CommandExecutionStatus.InProgress or CommandExecutionStatus.Processing) + { + return false; + } + + // Allow retries from failed or completed states by clearing previous outcome + if (state.State.Status is CommandExecutionStatus.Completed or CommandExecutionStatus.Failed) + { + state.State.Result = null; + state.State.ErrorMessage = null; + state.State.CompletedAt = null; + state.State.FailedAt = null; + } + else if (state.State.Status is not CommandExecutionStatus.NotFound and not CommandExecutionStatus.NotStarted) { return false; } diff --git a/ManagedCode.Communication.Orleans/Stores/OrleansCommandIdempotencyStore.cs b/ManagedCode.Communication.Orleans/Stores/OrleansCommandIdempotencyStore.cs index f28db23..99bc0c8 100644 --- a/ManagedCode.Communication.Orleans/Stores/OrleansCommandIdempotencyStore.cs +++ b/ManagedCode.Communication.Orleans/Stores/OrleansCommandIdempotencyStore.cs @@ -41,7 +41,15 @@ public async Task SetCommandStatusAsync(string commandId, CommandExecutionStatus await grain.MarkFailedAsync("Status set to failed"); break; case CommandExecutionStatus.Completed: - await grain.MarkCompletedAsync(null); + var (hasResult, result) = await grain.TryGetResultAsync(); + if (hasResult) + { + await grain.MarkCompletedAsync(result); + } + else + { + await grain.MarkCompletedAsync(default); + } break; case CommandExecutionStatus.NotStarted: case CommandExecutionStatus.NotFound: @@ -175,7 +183,21 @@ public async Task MarkFailedAsync(Guid commandId, string errorMessage, Cancellat public async Task<(bool success, TResult? result)> TryGetResultAsync(Guid commandId, CancellationToken cancellationToken = default) { - var result = await GetCommandResultAsync(commandId.ToString(), cancellationToken); - return (result != null, result); + var grain = _grainFactory.GetGrain(commandId.ToString()); + var status = await grain.GetStatusAsync(); + + if (status != CommandExecutionStatus.Completed) + { + return (false, default); + } + + var (_, result) = await grain.TryGetResultAsync(); + + if (result is TResult typedResult) + { + return (true, typedResult); + } + + return (true, default); } } \ No newline at end of file diff --git a/ManagedCode.Communication.Tests/Commands/CommandIdempotencyTests.cs b/ManagedCode.Communication.Tests/Commands/CommandIdempotencyTests.cs index 9e3d466..1619ba8 100644 --- a/ManagedCode.Communication.Tests/Commands/CommandIdempotencyTests.cs +++ b/ManagedCode.Communication.Tests/Commands/CommandIdempotencyTests.cs @@ -1,5 +1,7 @@ using System; using System.Collections.Generic; +using System.Diagnostics; +using System.Threading; using System.Threading.Tasks; using Shouldly; using ManagedCode.Communication.Commands; @@ -284,6 +286,145 @@ public async Task ExecuteIdempotentAsync_SecondExecution_ReturnsStoredResultWith executionCount.ShouldBe(1); // Should not execute second time } + [Fact] + public async Task ExecuteIdempotentAsync_WhenOperationReturnsNull_CachesNullResult() + { + // Arrange + const string commandId = "test-command-null-result"; + var executionCount = 0; + + var operation = new Func>(() => + { + executionCount++; + return Task.FromResult(null); + }); + + // Act + var first = await _store.ExecuteIdempotentAsync(commandId, operation); + var second = await _store.ExecuteIdempotentAsync(commandId, operation); + + // Assert + first.ShouldBeNull(); + second.ShouldBeNull(); + executionCount.ShouldBe(1); + } + + [Fact] + public async Task ExecuteIdempotentAsync_WhenOperationReturnsDefaultStructValue_CachesResult() + { + // Arrange + const string commandId = "test-command-default-struct"; + var executionCount = 0; + + var operation = new Func>(() => + { + executionCount++; + return Task.FromResult(0); + }); + + // Act + var first = await _store.ExecuteIdempotentAsync(commandId, operation); + var second = await _store.ExecuteIdempotentAsync(commandId, operation); + + // Assert + first.ShouldBe(0); + second.ShouldBe(0); + executionCount.ShouldBe(1); + } + + [Fact] + public async Task ExecuteIdempotentAsync_WhenConcurrentCallersShareCommand_WaitsForSingleExecution() + { + // Arrange + const string commandId = "concurrent-single-execution"; + var operationStarted = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + var operationCompletion = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + var secondaryOperationInvoked = false; + var executionCount = 0; + + var firstCall = _store.ExecuteIdempotentAsync(commandId, async () => + { + Interlocked.Increment(ref executionCount); + operationStarted.TrySetResult(); + return await operationCompletion.Task; + }); + + await operationStarted.Task; // Ensure the first invocation has claimed execution + + var secondCall = _store.ExecuteIdempotentAsync(commandId, () => + { + secondaryOperationInvoked = true; + return Task.FromResult("should-not-run"); + }); + + operationCompletion.TrySetResult("shared-result"); + + var results = await Task.WhenAll(firstCall, secondCall); + + executionCount.ShouldBe(1); + secondaryOperationInvoked.ShouldBeFalse(); + results.ShouldBe(new[] { "shared-result", "shared-result" }); + } + + [Fact] + public async Task ExecuteIdempotentAsync_WhenPrimaryExecutionFails_ConcurrentCallerReceivesFailure() + { + // Arrange + const string commandId = "concurrent-failure"; + var startSignal = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + + var failingCall = _store.ExecuteIdempotentAsync(commandId, async () => + { + startSignal.TrySetResult(); + await Task.Delay(20); + throw new InvalidOperationException("boom"); + }); + + await startSignal.Task; + + var waitingCall = _store.ExecuteIdempotentAsync(commandId, () => Task.FromResult("should-not-run")); + + var waitingException = await Should.ThrowAsync(() => waitingCall); + waitingException.Message.ShouldBe("Command concurrent-failure failed during execution"); + + var primaryException = await Should.ThrowAsync(() => failingCall); + primaryException.Message.ShouldBe("boom"); + + var status = await _store.GetCommandStatusAsync(commandId); + status.ShouldBe(CommandExecutionStatus.Failed); + } + + [Fact] + public async Task ExecuteIdempotentAsync_WithDifferentCommandIds_DoesNotSerializeExecution() + { + // Arrange + const string commandId1 = "parallel-command-1"; + const string commandId2 = "parallel-command-2"; + + // Act + var stopwatch = Stopwatch.StartNew(); + + var task1 = _store.ExecuteIdempotentAsync(commandId1, async () => + { + await Task.Delay(150); + return "result-1"; + }); + + var task2 = _store.ExecuteIdempotentAsync(commandId2, async () => + { + await Task.Delay(150); + return "result-2"; + }); + + await Task.WhenAll(task1, task2); + stopwatch.Stop(); + + // Assert + stopwatch.Elapsed.ShouldBeLessThan(TimeSpan.FromMilliseconds(260)); + (await task1).ShouldBe("result-1"); + (await task2).ShouldBe("result-2"); + } + [Fact] public async Task ExecuteIdempotentAsync_WhenOperationFails_MarksCommandAsFailedAndRethrowsException() { @@ -326,6 +467,64 @@ public async Task ExecuteBatchIdempotentAsync_ExecutesMultipleOperations() results["batch-cmd-3"].ShouldBe("result-3"); } + [Fact] + public async Task ExecuteBatchIdempotentAsync_WhenCommandIsAlreadyCompleted_ReusesCachedResult() + { + // Arrange + const string cachedCommandId = "batch-cached-1"; + const string pendingCommandId = "batch-cached-2"; + await _store.SetCommandStatusAsync(cachedCommandId, CommandExecutionStatus.Completed); + await _store.SetCommandResultAsync(cachedCommandId, "cached-value"); + + var invoked = false; + + var operations = new (string commandId, Func> operation)[] + { + (cachedCommandId, () => + { + invoked = true; + return Task.FromResult("should-not-run"); + }), + (pendingCommandId, () => Task.FromResult("fresh-value")) + }; + + // Act + var results = await _store.ExecuteBatchIdempotentAsync(operations); + + // Assert + invoked.ShouldBeFalse(); + results[cachedCommandId].ShouldBe("cached-value"); + results[pendingCommandId].ShouldBe("fresh-value"); + } + + [Fact] + public async Task ExecuteBatchIdempotentAsync_WhenCachedResultIsNull_PreservesNull() + { + // Arrange + const string cachedCommandId = "batch-cached-null"; + await _store.SetCommandStatusAsync(cachedCommandId, CommandExecutionStatus.Completed); + await _store.SetCommandResultAsync(cachedCommandId, default!); + + var executed = false; + + var operations = new (string commandId, Func> operation)[] + { + (cachedCommandId, () => + { + executed = true; + return Task.FromResult("should-not-execute"); + }) + }; + + // Act + var results = await _store.ExecuteBatchIdempotentAsync(operations); + + // Assert + executed.ShouldBeFalse(); + results.ShouldHaveSingleItem(); + results[cachedCommandId].ShouldBeNull(); + } + [Fact] public async Task TryGetCachedResultAsync_WhenResultExists_ReturnsResult() { @@ -358,6 +557,22 @@ public async Task TryGetCachedResultAsync_WhenResultDoesNotExist_ReturnsNoResult result.ShouldBeNull(); } + [Fact] + public async Task TryGetCachedResultAsync_WhenResultIsNull_ReturnsHasResultTrue() + { + // Arrange + const string commandId = "test-cached-null"; + await _store.SetCommandStatusAsync(commandId, CommandExecutionStatus.Completed); + await _store.SetCommandResultAsync(commandId, default!); + + // Act + var (hasResult, result) = await _store.TryGetCachedResultAsync(commandId); + + // Assert + hasResult.ShouldBeTrue(); + result.ShouldBeNull(); + } + public void Dispose() { _store?.Dispose(); diff --git a/ManagedCode.Communication.Tests/Commands/CommandTests.cs b/ManagedCode.Communication.Tests/Commands/CommandTests.cs index 4e1f74b..aaf9b93 100644 --- a/ManagedCode.Communication.Tests/Commands/CommandTests.cs +++ b/ManagedCode.Communication.Tests/Commands/CommandTests.cs @@ -80,6 +80,50 @@ public void GenericFrom_WithCommandType_ShouldReturnCommand() .ShouldBe("value"); } + [Fact] + public void Create_ShouldStampTimestampWithUtcNow() + { + var before = DateTime.UtcNow; + + var command = Command.Create("TimestampTest"); + + var after = DateTime.UtcNow; + command.Timestamp.ShouldBeInRange(before, after); + command.Timestamp.Kind.ShouldBe(DateTimeKind.Utc); + } + + [Fact] + public void Create_ShouldUseVersion7CommandId() + { + var command = Command.Create("VersionTest"); + + GetGuidVersion(command.CommandId).ShouldBe(7); + } + + [Fact] + public void GenericCreate_WithDerivedValue_ShouldUseDerivedTypeName() + { + var payload = new DerivedPayload(); + + var command = Command.Create(payload); + + command.CommandType.ShouldBe(nameof(DerivedPayload)); + } + + private static int GetGuidVersion(Guid guid) + { + var bytes = guid.ToByteArray(); + return (bytes[7] >> 4) & 0x0F; + } + + private class BasePayload + { + } + + private sealed class DerivedPayload : BasePayload + { + } + private enum TestCommandType { Create, diff --git a/ManagedCode.Communication/Commands/Extensions/CommandIdempotencyExtensions.cs b/ManagedCode.Communication/Commands/Extensions/CommandIdempotencyExtensions.cs index d44d2ee..f7183a7 100644 --- a/ManagedCode.Communication/Commands/Extensions/CommandIdempotencyExtensions.cs +++ b/ManagedCode.Communication/Commands/Extensions/CommandIdempotencyExtensions.cs @@ -20,61 +20,54 @@ public static async Task ExecuteIdempotentAsync( Func> operation, CancellationToken cancellationToken = default) { - // Fast path: check for existing completed result - var existingResult = await store.GetCommandResultAsync(commandId, cancellationToken); - if (existingResult != null) + while (true) { - return existingResult; - } + cancellationToken.ThrowIfCancellationRequested(); - // Atomically try to claim the command for execution - var (currentStatus, wasSet) = await store.GetAndSetStatusAsync( - commandId, - CommandExecutionStatus.InProgress, - cancellationToken); + var currentStatus = await store.GetCommandStatusAsync(commandId, cancellationToken); - switch (currentStatus) - { - case CommandExecutionStatus.Completed: - // Result exists but might have been evicted, get it again - existingResult = await store.GetCommandResultAsync(commandId, cancellationToken); - return existingResult ?? throw new InvalidOperationException($"Command {commandId} marked as completed but result not found"); - - case CommandExecutionStatus.InProgress: - case CommandExecutionStatus.Processing: - // Another thread is executing, wait for completion - return await WaitForCompletionAsync(store, commandId, cancellationToken); - - case CommandExecutionStatus.Failed: - // Previous execution failed, we can retry (wasSet should be true) - if (!wasSet) + switch (currentStatus) + { + case CommandExecutionStatus.Completed: { - // Race condition - another thread claimed it - return await WaitForCompletionAsync(store, commandId, cancellationToken); + var cachedResult = await store.GetCommandResultAsync(commandId, cancellationToken); + return cachedResult ?? default!; } - break; - - case CommandExecutionStatus.NotFound: - case CommandExecutionStatus.NotStarted: - default: - // First execution (wasSet should be true) - if (!wasSet) - { - // Race condition - another thread claimed it + + case CommandExecutionStatus.InProgress: + case CommandExecutionStatus.Processing: return await WaitForCompletionAsync(store, commandId, cancellationToken); + + case CommandExecutionStatus.NotFound: + case CommandExecutionStatus.NotStarted: + case CommandExecutionStatus.Failed: + default: + { + var claimed = await store.TrySetCommandStatusAsync( + commandId, + currentStatus, + CommandExecutionStatus.InProgress, + cancellationToken); + + if (claimed) + { + goto ExecuteOperation; + } + + break; } - break; + } } - // We successfully claimed the command for execution + ExecuteOperation: try { var result = await operation(); - + // Store result and mark as completed atomically await store.SetCommandResultAsync(commandId, result, cancellationToken); await store.SetCommandStatusAsync(commandId, CommandExecutionStatus.Completed, cancellationToken); - + return result; } catch (Exception) @@ -96,7 +89,7 @@ public static async Task> ExecuteBatchIdempotentAsync( var operationsList = operations.ToList(); var commandIds = operationsList.Select(op => op.commandId).ToList(); - // Check for existing results in batch + var existingStatuses = await store.GetMultipleStatusAsync(commandIds, cancellationToken); var existingResults = await store.GetMultipleResultsAsync(commandIds, cancellationToken); var results = new Dictionary(); var pendingOperations = new List<(string commandId, Func> operation)>(); @@ -104,9 +97,10 @@ public static async Task> ExecuteBatchIdempotentAsync( // Separate completed from pending foreach (var (commandId, operation) in operationsList) { - if (existingResults.TryGetValue(commandId, out var existingResult) && existingResult != null) + if (existingStatuses.TryGetValue(commandId, out var status) && status == CommandExecutionStatus.Completed) { - results[commandId] = existingResult; + existingResults.TryGetValue(commandId, out var existingResult); + results[commandId] = existingResult ?? default!; } else { @@ -146,7 +140,7 @@ public static async Task> ExecuteBatchIdempotentAsync( if (status == CommandExecutionStatus.Completed) { var result = await store.GetCommandResultAsync(commandId, cancellationToken); - return (result != null, result); + return (true, result); } return (false, default); @@ -178,7 +172,7 @@ private static async Task WaitForCompletionAsync( { case CommandExecutionStatus.Completed: var result = await store.GetCommandResultAsync(commandId, cancellationToken); - return result ?? throw new InvalidOperationException($"Command {commandId} completed but result not found"); + return result ?? default!; case CommandExecutionStatus.Failed: throw new InvalidOperationException($"Command {commandId} failed during execution"); diff --git a/ManagedCode.Communication/Commands/Stores/MemoryCacheCommandIdempotencyStore.cs b/ManagedCode.Communication/Commands/Stores/MemoryCacheCommandIdempotencyStore.cs index 7deba8c..641db7c 100644 --- a/ManagedCode.Communication/Commands/Stores/MemoryCacheCommandIdempotencyStore.cs +++ b/ManagedCode.Communication/Commands/Stores/MemoryCacheCommandIdempotencyStore.cs @@ -84,46 +84,60 @@ public Task RemoveCommandAsync(string commandId, CancellationToken cancellationT return Task.CompletedTask; } - private readonly SemaphoreSlim _semaphore = new SemaphoreSlim(1, 1); + private readonly ConcurrentDictionary _commandLocks = new(); - public async Task TrySetCommandStatusAsync(string commandId, CommandExecutionStatus expectedStatus, CommandExecutionStatus newStatus, CancellationToken cancellationToken = default) + private async Task AcquireLockAsync(string commandId, CancellationToken cancellationToken) { - await _semaphore.WaitAsync(cancellationToken); + var commandLock = _commandLocks.GetOrAdd(commandId, static _ => new CommandLock()); + Interlocked.Increment(ref commandLock.RefCount); + try { - var currentStatus = _memoryCache.Get(GetStatusKey(commandId)) ?? CommandExecutionStatus.NotFound; - - if (currentStatus == expectedStatus) - { - await SetCommandStatusAsync(commandId, newStatus, cancellationToken); - return true; - } - - return false; + await commandLock.Semaphore.WaitAsync(cancellationToken); + return new LockScope(this, commandId, commandLock); } - finally + catch { - _semaphore.Release(); + ReleaseLockReference(commandId, commandLock); + throw; } } - public async Task<(CommandExecutionStatus currentStatus, bool wasSet)> GetAndSetStatusAsync(string commandId, CommandExecutionStatus newStatus, CancellationToken cancellationToken = default) + private void ReleaseLockReference(string commandId, CommandLock commandLock) { - await _semaphore.WaitAsync(cancellationToken); - try + if (Interlocked.Decrement(ref commandLock.RefCount) == 0) { - var statusKey = GetStatusKey(commandId); - var currentStatus = _memoryCache.Get(statusKey) ?? CommandExecutionStatus.NotFound; - - // Set new status - await SetCommandStatusAsync(commandId, newStatus, cancellationToken); - - return (currentStatus, true); + _commandLocks.TryRemove(new KeyValuePair(commandId, commandLock)); + commandLock.Semaphore.Dispose(); } - finally + } + + public async Task TrySetCommandStatusAsync(string commandId, CommandExecutionStatus expectedStatus, CommandExecutionStatus newStatus, CancellationToken cancellationToken = default) + { + using var scope = await AcquireLockAsync(commandId, cancellationToken); + + var currentStatus = _memoryCache.Get(GetStatusKey(commandId)) ?? CommandExecutionStatus.NotFound; + + if (currentStatus == expectedStatus) { - _semaphore.Release(); + await SetCommandStatusAsync(commandId, newStatus, cancellationToken); + return true; } + + return false; + } + + public async Task<(CommandExecutionStatus currentStatus, bool wasSet)> GetAndSetStatusAsync(string commandId, CommandExecutionStatus newStatus, CancellationToken cancellationToken = default) + { + using var scope = await AcquireLockAsync(commandId, cancellationToken); + + var statusKey = GetStatusKey(commandId); + var currentStatus = _memoryCache.Get(statusKey) ?? CommandExecutionStatus.NotFound; + + // Set new status + await SetCommandStatusAsync(commandId, newStatus, cancellationToken); + + return (currentStatus, true); } // Batch operations @@ -242,4 +256,37 @@ public void Dispose() _disposed = true; } } + + private sealed class CommandLock + { + public SemaphoreSlim Semaphore { get; } = new(1, 1); + public int RefCount; + } + + private sealed class LockScope : IDisposable + { + private readonly MemoryCacheCommandIdempotencyStore _store; + private readonly string _commandId; + private readonly CommandLock _commandLock; + private bool _disposed; + + public LockScope(MemoryCacheCommandIdempotencyStore store, string commandId, CommandLock commandLock) + { + _store = store; + _commandId = commandId; + _commandLock = commandLock; + } + + public void Dispose() + { + if (_disposed) + { + return; + } + + _commandLock.Semaphore.Release(); + _store.ReleaseLockReference(_commandId, _commandLock); + _disposed = true; + } + } } diff --git a/README.md b/README.md index 96c9cf7..e56dc51 100644 --- a/README.md +++ b/README.md @@ -17,6 +17,8 @@ Result pattern for .NET that replaces exceptions with type-safe return values. F - [API Reference](#api-reference) - [Railway-Oriented Programming](#railway-oriented-programming) - [Command Pattern and Idempotency](#command-pattern-and-idempotency) + - [Command Correlation and Tracing Identifiers](#command-correlation-and-tracing-identifiers) + - [Idempotency Architecture Overview](#idempotency-architecture-overview) - [Error Handling Patterns](#error-handling-patterns) - [Integration Guides](#integration-guides) - [Performance](#performance) @@ -565,6 +567,88 @@ if (status == CommandExecutionStatus.Completed) } ``` +### Command Correlation and Tracing Identifiers + +Commands implement `ICommand` and surface correlation, causation, trace, span, user, and session identifiers alongside optional metadata so every hop can attach observability context. The base `Command` and `Command` types keep those properties on the +root object, and serializers/Orleans surrogates round-trip them without custom plumbing. +root object, and serializers/Orleans surrogates round-trip them without custom plumbing. + +#### Identifier lifecycle +- Static command factories generate monotonic version 7 identifiers via `Guid.CreateVersion7()` and stamp a UTC timestamp so commands can be sorted chronologically even when sharded. +- Factory helpers never mutate the correlation or trace identifiers; callers opt in by supplying values through fluent `WithCorrelationId`, `WithTraceId`, and similar extension methods that return the same command instance. +- Metadata mirrors the trace/span identifiers for workload-specific diagnostics without coupling transport-level identifiers to +payload annotations. + +#### Field reference + +| Field | Purpose | Typical source | Notes | +| --- | --- | --- | --- | +| `CommandId` | Unique, monotonic identifier for deduplication | Static command factories | Remains stable for retries and storage lookups. | +| `CorrelationId` | Ties a command to an upstream workflow/request | HTTP `X-Correlation-Id`, message headers | Preserved through + serialization and Orleans surrogates. | +| `CausationId` | Records the predecessor command/event | Current command ID | Supports causal chains in telemetry. | +| `TraceId` | Connects to distributed tracing spans | OpenTelemetry/`Activity` context | The library stores, but never generate +s, trace identifiers. | +| `SpanId` | Identifies the originating span | OpenTelemetry/`Activity` context | Often paired with `Metadata.TraceId` for deep +er traces. | +| `UserId` / `SessionId` | Attach security/session principals | Authentication middleware | Useful for multi-tenant auditing. | + +#### Trace vs. correlation +- **Correlation IDs** bundle every command spawned from a single business request. Assign them at ingress and keep the value st +able across retries so dashboards can answer “what commands ran because of this call?”. +- **Trace/Span IDs** follow distributed tracing semantics. Commands avoid creating new traces and instead persist the ambient `A +ctivity` identifiers through serialization so telemetry back-ends can stitch spans together. +- Both identifier sets are serialized together, enabling pivots between business-level correlation and technical call graphs wit +hout extra configuration. + +#### Generation and propagation guidance +- Use `Command.Create(...)` / `Command.Create(...)` (or the matching `From(...)` helpers) to get a version 7 identifier and U +TC timestamp automatically. +- Read or generate correlation IDs from HTTP headers or upstream messages and apply them via `.WithCorrelationId(...)` before d +ispatching commands. +- Capture `Activity.TraceId`/`Activity.SpanId` through `.WithTraceId(...)` and `.WithSpanId(...)` (and metadata counterparts) wh +en bridging to queues, Orleans, or background pipelines. +- Serialization tests verify the identifiers round-trip, so consumers can rely on receiving the same values they emitted. + +#### Operational considerations +- Factory unit tests ensure commands created through the helpers carry version 7 identifiers, UTC timestamps, and derived `Comma +ndType` values for traceability. +- Idempotency regression tests assert that concurrent callers reuse cached results and propagate failures consistently, preservi +ng correlation integrity when retry storms occur. + +### Idempotency Architecture Overview + +#### Scope +The shared idempotency helpers (`CommandIdempotencyExtensions`), default in-memory store, and test coverage work together to pro +tect concurrency, caching, and retry behaviour across hosts. + +#### Strengths +- **Deterministic status transitions.** `ExecuteIdempotentAsync` only invokes the provided delegate after atomically claiming th +e command, writes the result, and then flips the status to `Completed`, so retries either reuse cached output or wait for the in +-flight execution to finish. +- **Batch reuse of cached outputs.** Batch helpers perform bulk status/result lookups and bypass execution for already completed + commands, even when cached results are `null` or default values. +- **Fine-grained locking in the memory store.** Per-command `SemaphoreSlim` instances eliminate global contention, and reference + counting ensures locks are released once no callers use a key. +- **Concurrency regression tests.** Dedicated unit tests confirm that concurrent callers share a single execution, failed primar +y runs surface consistent exceptions, and the final status ends up in `Failed` when appropriate. + +#### Risks & considerations +- **Missing-result ambiguity.** If a store reports `Completed` but the result entry expired, the extensions currently return the + default value. Stores that can distinguish “missing” from “stored default” should override `TryGetCachedResultAsync` to trigger + a re-execution. +- **Wait semantics rely on polling.** Adaptive polling keeps responsiveness reasonable, but distributed stores can swap in push- +style notifications if tail latency becomes critical. +- **Status retention policies.** The memory store’s cleanup removes status and result after a TTL; other implementations must pr +ovide similar hygiene to avoid unbounded growth while keeping enough history for retries. + +#### Recommendations +1. Document store-specific retention guarantees so callers can tune retry windows. +2. Consider extending the store contract with a boolean flag (or sentinel wrapper) that differentiates cached `default` values f +rom missing entries. +3. Monitor lock-pool growth in long-lived applications and log keys that never release to diagnose misbehaving callers before me +mory pressure builds up. + ## Error Handling Patterns ### Validation Pattern