Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -34,57 +34,50 @@ public static async Task<T> ExecuteIdempotentAsync<T>(
CommandMetadata? metadata,
CancellationToken cancellationToken = default)
{
// Fast path: check for existing completed result
var existingResult = await store.GetCommandResultAsync<T>(commandId, cancellationToken);
if (existingResult != null)
while (true)
{
return existingResult;
}
cancellationToken.ThrowIfCancellationRequested();

// Atomically try to claim the command for execution
var (currentStatus, wasSet) = await store.GetAndSetStatusAsync(
commandId,
CommandExecutionStatus.InProgress,
cancellationToken);
var currentStatus = await store.GetCommandStatusAsync(commandId, cancellationToken);

switch (currentStatus)
{
case CommandExecutionStatus.Completed:
// Result exists but might have been evicted, get it again
existingResult = await store.GetCommandResultAsync<T>(commandId, cancellationToken);
return existingResult ?? throw new InvalidOperationException($"Command {commandId} marked as completed but result not found");

case CommandExecutionStatus.InProgress:
case CommandExecutionStatus.Processing:
// Another thread is executing, wait for completion
return await WaitForCompletionAsync<T>(store, commandId, cancellationToken);

case CommandExecutionStatus.Failed:
// Previous execution failed, we can retry (wasSet should be true)
if (!wasSet)
switch (currentStatus)
{
case CommandExecutionStatus.Completed:
{
// Race condition - another thread claimed it
return await WaitForCompletionAsync<T>(store, commandId, cancellationToken);
var cachedResult = await store.GetCommandResultAsync<T>(commandId, cancellationToken);
return cachedResult ?? default!;
}
break;

case CommandExecutionStatus.NotFound:
case CommandExecutionStatus.NotStarted:
default:
// First execution (wasSet should be true)
if (!wasSet)
{
// Race condition - another thread claimed it

case CommandExecutionStatus.InProgress:
case CommandExecutionStatus.Processing:
return await WaitForCompletionAsync<T>(store, commandId, cancellationToken);

case CommandExecutionStatus.NotFound:
case CommandExecutionStatus.NotStarted:
case CommandExecutionStatus.Failed:
default:
{
var claimed = await store.TrySetCommandStatusAsync(
commandId,
currentStatus,
CommandExecutionStatus.InProgress,
cancellationToken);

if (claimed)
{
goto ExecuteOperation;
}

break;
}
break;
}
}

// We successfully claimed the command for execution
ExecuteOperation:
try
{
var result = await operation();

// Store result and mark as completed atomically
await store.SetCommandResultAsync(commandId, result, cancellationToken);
await store.SetCommandStatusAsync(commandId, CommandExecutionStatus.Completed, cancellationToken);
Expand Down Expand Up @@ -159,7 +152,7 @@ public static async Task<T> ExecuteIdempotentWithRetryAsync<T>(
if (status == CommandExecutionStatus.Completed)
{
var result = await store.GetCommandResultAsync<T>(commandId, cancellationToken);
return (result != null, result);
return (true, result);
}

return (false, default);
Expand Down Expand Up @@ -192,17 +185,18 @@ public static async Task<Dictionary<string, T>> ExecuteBatchIdempotentAsync<T>(
var operationsList = operations.ToList();
var commandIds = operationsList.Select(op => op.commandId).ToList();

// Check for existing results in batch
var existingStatuses = await store.GetMultipleStatusAsync(commandIds, cancellationToken);
var existingResults = await store.GetMultipleResultsAsync<T>(commandIds, cancellationToken);
var results = new Dictionary<string, T>();
var pendingOperations = new List<(string commandId, Func<Task<T>> operation)>();

// Separate completed from pending
foreach (var (commandId, operation) in operationsList)
{
if (existingResults.TryGetValue(commandId, out var existingResult) && existingResult != null)
if (existingStatuses.TryGetValue(commandId, out var status) && status == CommandExecutionStatus.Completed)
{
results[commandId] = existingResult;
existingResults.TryGetValue(commandId, out var existingResult);
results[commandId] = existingResult ?? default!;
}
else
{
Expand Down Expand Up @@ -255,7 +249,7 @@ private static async Task<T> WaitForCompletionAsync<T>(
{
case CommandExecutionStatus.Completed:
var result = await store.GetCommandResultAsync<T>(commandId, cancellationToken);
return result ?? throw new InvalidOperationException($"Command {commandId} completed but result not found");
return result ?? default!;

case CommandExecutionStatus.Failed:
throw new InvalidOperationException($"Command {commandId} failed during execution");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,21 @@ public Task<CommandExecutionStatus> GetStatusAsync()

public async Task<bool> TryStartProcessingAsync()
{
// Check if already processing or completed
if (state.State.Status != CommandExecutionStatus.NotFound)
// Reject concurrent executions
if (state.State.Status is CommandExecutionStatus.InProgress or CommandExecutionStatus.Processing)
{
return false;
}

// Allow retries from failed or completed states by clearing previous outcome
if (state.State.Status is CommandExecutionStatus.Completed or CommandExecutionStatus.Failed)
{
state.State.Result = null;
state.State.ErrorMessage = null;
state.State.CompletedAt = null;
state.State.FailedAt = null;
}
else if (state.State.Status is not CommandExecutionStatus.NotFound and not CommandExecutionStatus.NotStarted)
{
Comment on lines 27 to 44

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Prevent retries when command already succeeded

TryStartProcessingAsync now returns true when the command state is Completed or Failed, clearing the stored result and error metadata before flipping the status back to Processing. This allows the same command id to execute again and overwrites the cached outcome that idempotency is intended to preserve. Callers that rely on TryStartProcessingAsync to guard duplicate execution will re‑run completed commands instead of short‑circuiting, breaking idempotent guarantees and potentially reissuing side effects.

Useful? React with 👍 / 👎.

return false;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,15 @@ public async Task SetCommandStatusAsync(string commandId, CommandExecutionStatus
await grain.MarkFailedAsync("Status set to failed");
break;
case CommandExecutionStatus.Completed:
await grain.MarkCompletedAsync<object?>(null);
var (hasResult, result) = await grain.TryGetResultAsync();
if (hasResult)
{
await grain.MarkCompletedAsync(result);
}
else
{
await grain.MarkCompletedAsync<object?>(default);
}
break;
case CommandExecutionStatus.NotStarted:
case CommandExecutionStatus.NotFound:
Expand Down Expand Up @@ -175,7 +183,21 @@ public async Task MarkFailedAsync(Guid commandId, string errorMessage, Cancellat

public async Task<(bool success, TResult? result)> TryGetResultAsync<TResult>(Guid commandId, CancellationToken cancellationToken = default)
{
var result = await GetCommandResultAsync<TResult>(commandId.ToString(), cancellationToken);
return (result != null, result);
var grain = _grainFactory.GetGrain<ICommandIdempotencyGrain>(commandId.ToString());
var status = await grain.GetStatusAsync();

if (status != CommandExecutionStatus.Completed)
{
return (false, default);
}

var (_, result) = await grain.TryGetResultAsync();

if (result is TResult typedResult)
{
return (true, typedResult);
}

return (true, default);
}
}
Loading