diff --git a/src/DurableTask.Core/TaskEntityDispatcher.cs b/src/DurableTask.Core/TaskEntityDispatcher.cs index 4595aae2a..72e31428a 100644 --- a/src/DurableTask.Core/TaskEntityDispatcher.cs +++ b/src/DurableTask.Core/TaskEntityDispatcher.cs @@ -131,13 +131,13 @@ async Task OnProcessWorkItemSessionAsync(TaskOrchestrationWorkItem workItem) if (workItem.Session == null) { // Legacy behavior - await this.OnProcessWorkItemAsync(workItem); + await this.OnProcessWorkItemAsync(workItem, null); return; } - var isExtendedSession = false; - + var concurrencyLockAcquired = false; var processCount = 0; + SchedulerState schedulerState = null; try { while (true) @@ -145,23 +145,34 @@ async Task OnProcessWorkItemSessionAsync(TaskOrchestrationWorkItem workItem) // While the work item contains messages that need to be processed, execute them. if (workItem.NewMessages?.Count > 0) { - bool isCompletedOrInterrupted = await this.OnProcessWorkItemAsync(workItem); - if (isCompletedOrInterrupted) + // We only need to acquire the lock on the first execution within the extended session + if (!concurrencyLockAcquired) + { + concurrencyLockAcquired = this.concurrentSessionLock.Acquire(); + } + workItem.IsExtendedSession = concurrencyLockAcquired; + // Regardless of whether or not we acquired the concurrent session lock, we will make sure to execute this work item. + // If we failed to acquire it, we will end the extended session after this execution. + schedulerState = await this.OnProcessWorkItemAsync(workItem, schedulerState); + + // The entity has been deleted, so we end the extended session. + if (this.EntityIsDeleted(schedulerState)) { break; } + // When extended sessions are enabled, the handler caches the entity state after the first execution of the extended session, so there + // is no need to retain a reference to it here. + // We set the local reference to null so that the entity state can be garbage collected while we wait for more messages to arrive. + schedulerState.EntityState = null; + processCount++; } - // Fetches beyond the first require getting an extended session lock, used to prevent starvation. - if (processCount > 0 && !isExtendedSession) + // If we failed to acquire the concurrent session lock, we will end the extended session after the execution of the first work item + if (processCount > 0 && !concurrencyLockAcquired) { - isExtendedSession = this.concurrentSessionLock.Acquire(); - if (!isExtendedSession) - { - break; - } + break; } Stopwatch timer = Stopwatch.StartNew(); @@ -179,7 +190,7 @@ async Task OnProcessWorkItemSessionAsync(TaskOrchestrationWorkItem workItem) } finally { - if (isExtendedSession) + if (concurrencyLockAcquired) { this.concurrentSessionLock.Release(); } @@ -208,7 +219,9 @@ internal class WorkItemEffects /// Method to process a new work item /// /// The work item to process - protected async Task OnProcessWorkItemAsync(TaskOrchestrationWorkItem workItem) + /// If extended sessions are enabled, the scheduler state that is being cached across executions. + /// If they are not enabled, or if this is the first execution from within an extended session, this parameter is null. + private async Task OnProcessWorkItemAsync(TaskOrchestrationWorkItem workItem, SchedulerState schedulerState) { OrchestrationRuntimeState originalOrchestrationRuntimeState = workItem.OrchestrationRuntimeState; @@ -245,19 +258,20 @@ protected async Task OnProcessWorkItemAsync(TaskOrchestrationWorkItem work } else { + bool firstExecutionIfExtendedSession = schedulerState == null; // we start with processing all the requests and figuring out which ones to execute now // results can depend on whether the entity is locked, what the maximum batch size is, // and whether the messages arrived out of order this.DetermineWork(workItem.OrchestrationRuntimeState, - out SchedulerState schedulerState, + ref schedulerState, out Work workToDoNow); if (workToDoNow.OperationCount > 0) { // execute the user-defined operations on this entity, via the middleware - var result = await this.ExecuteViaMiddlewareAsync(workToDoNow, runtimeState.OrchestrationInstance, schedulerState.EntityState); + var result = await this.ExecuteViaMiddlewareAsync(workToDoNow, runtimeState.OrchestrationInstance, schedulerState.EntityState, workItem.IsExtendedSession, firstExecutionIfExtendedSession); var operationResults = result.Results!; // if we encountered an error, record it as the result of the operations @@ -417,7 +431,7 @@ await this.orchestrationService.CompleteTaskOrchestrationWorkItemAsync( workItem.OrchestrationRuntimeState = runtimeState; } - return true; + return schedulerState; } void ProcessLockRequest(WorkItemEffects effects, SchedulerState schedulerState, RequestMessage request) @@ -445,7 +459,7 @@ void ProcessLockRequest(WorkItemEffects effects, SchedulerState schedulerState, string SerializeSchedulerStateForNextExecution(SchedulerState schedulerState) { - if (this.entityBackendProperties.SupportsImplicitEntityDeletion && schedulerState.IsEmpty && !schedulerState.Suspended) + if (this.EntityIsDeleted(schedulerState)) { // this entity scheduler is idle and the entity is deleted, so the instance and history can be removed from storage // we convey this to the durability provider by issuing a continue-as-new with null input @@ -460,10 +474,11 @@ string SerializeSchedulerStateForNextExecution(SchedulerState schedulerState) #region Preprocess to determine work - void DetermineWork(OrchestrationRuntimeState runtimeState, out SchedulerState schedulerState, out Work batch) + void DetermineWork(OrchestrationRuntimeState runtimeState, ref SchedulerState schedulerState, out Work batch) { string instanceId = runtimeState.OrchestrationInstance.InstanceId; - schedulerState = new SchedulerState(); + bool deserializeState = schedulerState == null; + schedulerState ??= new(); batch = new Work(); Queue lockHolderMessages = null; @@ -474,8 +489,9 @@ void DetermineWork(OrchestrationRuntimeState runtimeState, out SchedulerState sc { case EventType.ExecutionStarted: - - if (runtimeState.Input != null) + // Only attempt to deserialize the scheduler state if we don't already have it in memory. + // This occurs on the first execution within an extended session, or when extended sessions are disabled. + if (runtimeState.Input != null && deserializeState) { try { @@ -624,6 +640,11 @@ void DetermineWork(OrchestrationRuntimeState runtimeState, out SchedulerState sc } } + bool EntityIsDeleted(SchedulerState schedulerState) + { + return schedulerState != null && this.entityBackendProperties.SupportsImplicitEntityDeletion && schedulerState.IsEmpty && !schedulerState.Suspended; + } + class Work { List operationBatch; // a (possibly empty) sequence of operations to be executed on the entity @@ -931,7 +952,7 @@ internal void ProcessSendStartMessage(WorkItemEffects effects, OrchestrationRunt #endregion - async Task ExecuteViaMiddlewareAsync(Work workToDoNow, OrchestrationInstance instance, string serializedEntityState) + async Task ExecuteViaMiddlewareAsync(Work workToDoNow, OrchestrationInstance instance, string serializedEntityState, bool isExtendedSession, bool includeEntityState) { var (operations, traceActivities) = workToDoNow.GetOperationRequestsAndTraceActivities(instance.InstanceId); // the request object that will be passed to the worker @@ -954,6 +975,7 @@ async Task ExecuteViaMiddlewareAsync(Work workToDoNow, Orches var dispatchContext = new DispatchMiddlewareContext(); dispatchContext.SetProperty(request); + dispatchContext.SetProperty(new WorkItemMetadata { IsExtendedSession = isExtendedSession, IncludeState = includeEntityState }); await this.dispatchPipeline.RunAsync(dispatchContext, async _ => { diff --git a/src/DurableTask.Core/TaskOrchestrationDispatcher.cs b/src/DurableTask.Core/TaskOrchestrationDispatcher.cs index b81cbae50..6cab483ba 100644 --- a/src/DurableTask.Core/TaskOrchestrationDispatcher.cs +++ b/src/DurableTask.Core/TaskOrchestrationDispatcher.cs @@ -780,7 +780,7 @@ async Task ExecuteOrchestrationAsync(Orchestration dispatchContext.SetProperty(workItem); dispatchContext.SetProperty(GetOrchestrationExecutionContext(runtimeState)); dispatchContext.SetProperty(this.entityParameters); - dispatchContext.SetProperty(new WorkItemMetadata { IsExtendedSession = workItem.IsExtendedSession, IncludePastEvents = true }); + dispatchContext.SetProperty(new WorkItemMetadata { IsExtendedSession = workItem.IsExtendedSession, IncludeState = true }); TaskOrchestrationExecutor? executor = null; @@ -833,7 +833,7 @@ async Task ResumeOrchestrationAsync(TaskOrchestrationWorkItem workItem) dispatchContext.SetProperty(cursor.TaskOrchestration); dispatchContext.SetProperty(cursor.RuntimeState); dispatchContext.SetProperty(workItem); - dispatchContext.SetProperty(new WorkItemMetadata { IsExtendedSession = true, IncludePastEvents = false }); + dispatchContext.SetProperty(new WorkItemMetadata { IsExtendedSession = true, IncludeState = false }); cursor.LatestDecisions = Enumerable.Empty(); await this.dispatchPipeline.RunAsync(dispatchContext, _ => diff --git a/src/DurableTask.Core/WorkItemMetadata.cs b/src/DurableTask.Core/WorkItemMetadata.cs index ae3de4651..da47d0d55 100644 --- a/src/DurableTask.Core/WorkItemMetadata.cs +++ b/src/DurableTask.Core/WorkItemMetadata.cs @@ -11,9 +11,10 @@ public class WorkItemMetadata public bool IsExtendedSession { get; set; } /// - /// Gets or sets whether or not to include past events in the orchestration history when executing the work item via middleware. - /// This assumes that the middleware is able to handle extended sessions and does not require history for replays. + /// Gets or sets whether or not to include instance state when executing the work item via middleware. + /// When false, this assumes that the middleware is able to handle extended sessions and has already cached + /// the instance state from a previous execution, so it does not need to be included again. /// - public bool IncludePastEvents { get; set; } + public bool IncludeState { get; set; } } }