From 6222ab3439f27c22b69477273416b8aae294ce3c Mon Sep 17 00:00:00 2001
From: ItalyPaleAle <43508+ItalyPaleAle@users.noreply.github.com>
Date: Fri, 8 Nov 2024 12:59:24 -0800
Subject: [PATCH 1/2] Handle poisoned activity task
---
.../Exceptions/WorkItemPoisonedException.cs | 24 ++++++++++
src/DurableTask.Core/Logging/EventIds.cs | 1 +
src/DurableTask.Core/Logging/LogEvents.cs | 48 ++++++++++++++++++-
src/DurableTask.Core/Logging/LogHelper.cs | 19 ++++++++
.../Logging/StructuredEventSource.cs | 28 ++++++++++-
.../TaskActivityDispatcher.cs | 24 ++++++++--
6 files changed, 138 insertions(+), 6 deletions(-)
create mode 100644 src/DurableTask.Core/Exceptions/WorkItemPoisonedException.cs
diff --git a/src/DurableTask.Core/Exceptions/WorkItemPoisonedException.cs b/src/DurableTask.Core/Exceptions/WorkItemPoisonedException.cs
new file mode 100644
index 000000000..beff99826
--- /dev/null
+++ b/src/DurableTask.Core/Exceptions/WorkItemPoisonedException.cs
@@ -0,0 +1,24 @@
+// ---------------------------------------------------------------
+// Copyright (c) Microsoft Corporation. All rights reserved.
+// ---------------------------------------------------------------
+
+namespace DurableTask.Core.Exceptions
+{
+ using System;
+
+ ///
+ /// Represents a work item that is poisoned and should not be retried.
+ ///
+ public class WorkItemPoisonedException : Exception
+ {
+ ///
+ /// Represents a work item that is poisoned and should not be retried.
+ ///
+ public WorkItemPoisonedException(
+ string message = "Work item is poisoned",
+ Exception innerException = null
+ ) : base(message, innerException)
+ {
+ }
+ }
+}
diff --git a/src/DurableTask.Core/Logging/EventIds.cs b/src/DurableTask.Core/Logging/EventIds.cs
index f7386eb99..28cc60b80 100644
--- a/src/DurableTask.Core/Logging/EventIds.cs
+++ b/src/DurableTask.Core/Logging/EventIds.cs
@@ -60,6 +60,7 @@ static class EventIds
public const int RenewActivityMessageStarting = 65;
public const int RenewActivityMessageCompleted = 66;
public const int RenewActivityMessageFailed = 67;
+ public const int TaskActivityPoisoned = 68;
public const int SuspendingInstance = 68;
public const int ResumingInstance = 69;
diff --git a/src/DurableTask.Core/Logging/LogEvents.cs b/src/DurableTask.Core/Logging/LogEvents.cs
index fd626d3be..468ecda38 100644
--- a/src/DurableTask.Core/Logging/LogEvents.cs
+++ b/src/DurableTask.Core/Logging/LogEvents.cs
@@ -1,4 +1,4 @@
-// ----------------------------------------------------------------------------------
+// ----------------------------------------------------------------------------------
// Copyright Microsoft Corporation
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
@@ -1600,6 +1600,52 @@ void IEventSourceEvent.WriteEventSource() =>
Utils.PackageVersion);
}
+ internal class TaskActivityPoisoned : StructuredLogEvent, IEventSourceEvent
+ {
+ public TaskActivityPoisoned(OrchestrationInstance instance, TaskScheduledEvent taskEvent, string details)
+ {
+ this.InstanceId = instance.InstanceId;
+ this.ExecutionId = instance.ExecutionId;
+ this.Name = taskEvent.Name;
+ this.TaskEventId = taskEvent.EventId;
+ this.Details = details;
+ }
+
+ [StructuredLogField]
+ public string InstanceId { get; }
+
+ [StructuredLogField]
+ public string ExecutionId { get; }
+
+ [StructuredLogField]
+ public string Name { get; }
+
+ [StructuredLogField]
+ public int TaskEventId { get; }
+
+ [StructuredLogField]
+ public string Details { get; }
+
+ public override EventId EventId => new EventId(
+ EventIds.TaskActivityPoisoned,
+ nameof(EventIds.TaskActivityPoisoned));
+
+ public override LogLevel Level => LogLevel.Warning;
+
+ protected override string CreateLogMessage() =>
+ $"{this.InstanceId}: Task activity {GetEventDescription(this.Name, this.TaskEventId)} is poisoned and was canceled: {this.Details}";
+
+ void IEventSourceEvent.WriteEventSource() =>
+ StructuredEventSource.Log.TaskActivityPoisoned(
+ this.InstanceId,
+ this.ExecutionId,
+ this.Name,
+ this.TaskEventId,
+ this.Details,
+ Utils.AppName,
+ Utils.PackageVersion);
+ }
+
internal class TaskActivityDispatcherError : StructuredLogEvent, IEventSourceEvent
{
public TaskActivityDispatcherError(TaskActivityWorkItem workItem, string details)
diff --git a/src/DurableTask.Core/Logging/LogHelper.cs b/src/DurableTask.Core/Logging/LogHelper.cs
index 109c14c42..7fccc0c71 100644
--- a/src/DurableTask.Core/Logging/LogHelper.cs
+++ b/src/DurableTask.Core/Logging/LogHelper.cs
@@ -10,6 +10,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
// ----------------------------------------------------------------------------------
+
#nullable enable
namespace DurableTask.Core.Logging
{
@@ -34,6 +35,7 @@ public LogHelper(ILogger? log)
bool IsStructuredLoggingEnabled => this.log != null;
#region TaskHubWorker
+
///
/// Logs that a is starting.
///
@@ -79,6 +81,7 @@ internal void TaskHubWorkerStopped(TimeSpan latency)
this.WriteStructuredLog(new LogEvents.TaskHubWorkerStopped(latency));
}
}
+
#endregion
#region WorkItemDispatcher traces
@@ -616,6 +619,7 @@ internal void EntityLockReleased(string entityId, Core.Entities.EventFormat.Rele
#endregion
#region Activity dispatcher
+
///
/// Logs that a task activity is about to begin execution.
///
@@ -678,6 +682,20 @@ internal void TaskActivityAborted(OrchestrationInstance instance, TaskScheduledE
}
}
+ ///
+ /// Logs a warning indicating that the activity execution is poisoned and was canceled.
+ ///
+ /// The orchestration instance that scheduled this task activity.
+ /// The history event associated with this activity execution.
+ /// More information about why the execution was canceled.
+ internal void TaskActivityPoisoned(OrchestrationInstance instance, TaskScheduledEvent taskEvent, string details)
+ {
+ if (this.IsStructuredLoggingEnabled)
+ {
+ this.WriteStructuredLog(new LogEvents.TaskActivityAborted(instance, taskEvent, details));
+ }
+ }
+
///
/// Logs that an error occurred when attempting to dispatch an activity work item.
///
@@ -728,6 +746,7 @@ internal void RenewActivityMessageFailed(TaskActivityWorkItem workItem, Exceptio
this.WriteStructuredLog(new LogEvents.RenewActivityMessageFailed(workItem, exception), exception);
}
}
+
#endregion
internal void OrchestrationDebugTrace(string instanceId, string executionId, string details)
diff --git a/src/DurableTask.Core/Logging/StructuredEventSource.cs b/src/DurableTask.Core/Logging/StructuredEventSource.cs
index 162a226f2..e5f8880a0 100644
--- a/src/DurableTask.Core/Logging/StructuredEventSource.cs
+++ b/src/DurableTask.Core/Logging/StructuredEventSource.cs
@@ -712,7 +712,7 @@ internal void EntityLockReleased(
// TODO: Use WriteEventCore for better performance
this.WriteEvent(
EventIds.EntityLockReleased,
- EntityId,
+ EntityId,
InstanceId,
Id,
AppName,
@@ -818,6 +818,32 @@ internal void TaskActivityAborted(
}
}
+ [Event(EventIds.TaskActivityPoisoned, Level = EventLevel.Warning, Version = 1)]
+ internal void TaskActivityPoisoned(
+ string InstanceId,
+ string ExecutionId,
+ string Name,
+ int TaskEventId,
+ string Details,
+ string AppName,
+ string ExtensionVersion
+ )
+ {
+ if (this.IsEnabled(EventLevel.Warning))
+ {
+ // TODO: Use WriteEventCore for better performance
+ this.WriteEvent(
+ EventIds.TaskActivityPoisoned,
+ InstanceId,
+ ExecutionId,
+ Name,
+ TaskEventId,
+ Details,
+ AppName,
+ ExtensionVersion);
+ }
+ }
+
[Event(EventIds.TaskActivityDispatcherError, Level = EventLevel.Error, Version = 1)]
internal void TaskActivityDispatcherError(
string InstanceId,
diff --git a/src/DurableTask.Core/TaskActivityDispatcher.cs b/src/DurableTask.Core/TaskActivityDispatcher.cs
index 1c22c307f..1c21fa24e 100644
--- a/src/DurableTask.Core/TaskActivityDispatcher.cs
+++ b/src/DurableTask.Core/TaskActivityDispatcher.cs
@@ -10,6 +10,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
// ----------------------------------------------------------------------------------
+
#nullable enable
namespace DurableTask.Core
{
@@ -117,13 +118,12 @@ async Task OnProcessWorkItemAsync(TaskActivityWorkItem workItem)
if (taskMessage.Event.EventType != EventType.TaskScheduled)
{
this.logHelper.TaskActivityDispatcherError(
- workItem,
+ workItem,
$"The activity worker received an event of type '{taskMessage.Event.EventType}' but only '{EventType.TaskScheduled}' is supported.");
throw TraceHelper.TraceException(
TraceEventType.Critical,
"TaskActivityDispatcher-UnsupportedEventType",
- new NotSupportedException("Activity worker does not support event of type: " +
- taskMessage.Event.EventType));
+ new NotSupportedException("Activity worker does not support event of type: " + taskMessage.Event.EventType));
}
scheduledEvent = (TaskScheduledEvent)taskMessage.Event;
@@ -268,6 +268,22 @@ await this.dispatchPipeline.RunAsync(dispatchContext, async _ =>
TraceHelper.TraceInstance(TraceEventType.Warning, "TaskActivityDispatcher-ExecutionAborted", orchestrationInstance, "{0}: {1}", scheduledEvent?.Name ?? "", e.Message);
await this.orchestrationService.AbandonTaskActivityWorkItemAsync(workItem);
}
+ catch (WorkItemPoisonedException poisonedException) when (scheduledEvent is not null)
+ {
+ // The task activity is poisoned and should be marked as failed
+ this.logHelper.TaskActivityPoisoned(orchestrationInstance, scheduledEvent!, poisonedException.Message);
+ TraceHelper.TraceInstance(TraceEventType.Warning, "TaskActivityDispatcher-ExecutionPoisoned", orchestrationInstance, "{0}: {1}", scheduledEvent?.Name ?? "", poisonedException.Message);
+ await this.orchestrationService.CompleteTaskActivityWorkItemAsync(
+ workItem, new TaskMessage()
+ {
+ Event = new TaskFailedEvent(
+ -1,
+ // Guaranteed to be not null because of the "when" clause in the catch block
+ scheduledEvent!.EventId,
+ poisonedException.Message, string.Empty),
+ OrchestrationInstance = orchestrationInstance,
+ });
+ }
finally
{
diagnosticActivity?.Stop(); // Ensure the activity is stopped here to prevent it from leaking out.
@@ -349,4 +365,4 @@ DateTime AdjustRenewAt(DateTime renewAt)
return renewAt > maxRenewAt ? maxRenewAt : renewAt;
}
}
-}
\ No newline at end of file
+}
From c77207c2bd3f92cff958631e51453f4482169088 Mon Sep 17 00:00:00 2001
From: ItalyPaleAle <43508+ItalyPaleAle@users.noreply.github.com>
Date: Fri, 8 Nov 2024 13:56:57 -0800
Subject: [PATCH 2/2] Handle poisoned orchestration work items
---
src/DurableTask.Core/Logging/EventIds.cs | 1 +
src/DurableTask.Core/Logging/LogEvents.cs | 41 ++++++++++-
src/DurableTask.Core/Logging/LogHelper.cs | 17 ++++-
.../Logging/StructuredEventSource.cs | 20 ++++++
.../OrchestrationRuntimeState.cs | 21 ++++--
.../TaskOrchestrationDispatcher.cs | 71 +++++++++++++------
6 files changed, 143 insertions(+), 28 deletions(-)
diff --git a/src/DurableTask.Core/Logging/EventIds.cs b/src/DurableTask.Core/Logging/EventIds.cs
index 28cc60b80..7de9aa0d4 100644
--- a/src/DurableTask.Core/Logging/EventIds.cs
+++ b/src/DurableTask.Core/Logging/EventIds.cs
@@ -51,6 +51,7 @@ static class EventIds
public const int EntityBatchExecuted = 56;
public const int EntityLockAcquired = 57;
public const int EntityLockReleased = 58;
+ public const int OrchestrationPoisoned = 59;
public const int TaskActivityStarting = 60;
public const int TaskActivityCompleted = 61;
diff --git a/src/DurableTask.Core/Logging/LogEvents.cs b/src/DurableTask.Core/Logging/LogEvents.cs
index 468ecda38..e443ea1fc 100644
--- a/src/DurableTask.Core/Logging/LogEvents.cs
+++ b/src/DurableTask.Core/Logging/LogEvents.cs
@@ -1137,6 +1137,45 @@ void IEventSourceEvent.WriteEventSource() =>
Utils.PackageVersion);
}
+ ///
+ /// Log event representing an orchestration becoming poisoned and being marked as failed.
+ ///
+ internal class OrchestrationPoisoned : StructuredLogEvent, IEventSourceEvent
+ {
+ public OrchestrationPoisoned(OrchestrationInstance instance, string reason)
+ {
+ this.InstanceId = instance.InstanceId;
+ this.ExecutionId = instance.ExecutionId;
+ this.Details = reason;
+ }
+
+ [StructuredLogField]
+ public string InstanceId { get; }
+
+ [StructuredLogField]
+ public string ExecutionId { get; }
+
+ [StructuredLogField]
+ public string Details { get; }
+
+ public override EventId EventId => new EventId(
+ EventIds.OrchestrationPoisoned,
+ nameof(EventIds.OrchestrationPoisoned));
+
+ public override LogLevel Level => LogLevel.Warning;
+
+ protected override string CreateLogMessage() =>
+ $"{this.InstanceId}: Orchestration execution is poisoned and was marked as failed: {this.Details}";
+
+ void IEventSourceEvent.WriteEventSource() =>
+ StructuredEventSource.Log.OrchestrationPoisoned(
+ this.InstanceId,
+ this.ExecutionId,
+ this.Details,
+ Utils.AppName,
+ Utils.PackageVersion);
+ }
+
///
/// Log event representing the discarding of an orchestration message that cannot be processed.
///
@@ -1633,7 +1672,7 @@ public TaskActivityPoisoned(OrchestrationInstance instance, TaskScheduledEvent t
public override LogLevel Level => LogLevel.Warning;
protected override string CreateLogMessage() =>
- $"{this.InstanceId}: Task activity {GetEventDescription(this.Name, this.TaskEventId)} is poisoned and was canceled: {this.Details}";
+ $"{this.InstanceId}: Task activity {GetEventDescription(this.Name, this.TaskEventId)} is poisoned and was marked as failed: {this.Details}";
void IEventSourceEvent.WriteEventSource() =>
StructuredEventSource.Log.TaskActivityPoisoned(
diff --git a/src/DurableTask.Core/Logging/LogHelper.cs b/src/DurableTask.Core/Logging/LogHelper.cs
index 7fccc0c71..fdf527a63 100644
--- a/src/DurableTask.Core/Logging/LogHelper.cs
+++ b/src/DurableTask.Core/Logging/LogHelper.cs
@@ -500,6 +500,19 @@ internal void OrchestrationAborted(OrchestrationInstance instance, string reason
}
}
+ ///
+ /// Logs a warning indicating that the activity execution is poisoned and was canceled.
+ ///
+ /// The orchestration instance that failed.
+ /// The reason for the orchestration execution becoming poisoned.
+ internal void OrchestrationPoisoned(OrchestrationInstance instance, string reason)
+ {
+ if (this.IsStructuredLoggingEnabled)
+ {
+ this.WriteStructuredLog(new LogEvents.OrchestrationPoisoned(instance, reason));
+ }
+ }
+
///
/// Helper method for logging the dropping of all messages associated with the specified work item.
///
@@ -687,12 +700,12 @@ internal void TaskActivityAborted(OrchestrationInstance instance, TaskScheduledE
///
/// The orchestration instance that scheduled this task activity.
/// The history event associated with this activity execution.
- /// More information about why the execution was canceled.
+ /// More information about why the execution failed.
internal void TaskActivityPoisoned(OrchestrationInstance instance, TaskScheduledEvent taskEvent, string details)
{
if (this.IsStructuredLoggingEnabled)
{
- this.WriteStructuredLog(new LogEvents.TaskActivityAborted(instance, taskEvent, details));
+ this.WriteStructuredLog(new LogEvents.TaskActivityPoisoned(instance, taskEvent, details));
}
}
diff --git a/src/DurableTask.Core/Logging/StructuredEventSource.cs b/src/DurableTask.Core/Logging/StructuredEventSource.cs
index e5f8880a0..913800aef 100644
--- a/src/DurableTask.Core/Logging/StructuredEventSource.cs
+++ b/src/DurableTask.Core/Logging/StructuredEventSource.cs
@@ -599,6 +599,26 @@ internal void OrchestrationAborted(
}
}
+ [Event(EventIds.OrchestrationPoisoned, Level = EventLevel.Warning, Version = 1)]
+ internal void OrchestrationPoisoned(
+ string InstanceId,
+ string ExecutionId,
+ string Details,
+ string AppName,
+ string ExtensionVersion)
+ {
+ if (this.IsEnabled(EventLevel.Warning))
+ {
+ this.WriteEvent(
+ EventIds.OrchestrationPoisoned,
+ InstanceId,
+ ExecutionId,
+ Details,
+ AppName,
+ ExtensionVersion);
+ }
+ }
+
[Event(EventIds.DiscardingMessage, Level = EventLevel.Warning, Version = 1)]
internal void DiscardingMessage(
string InstanceId,
diff --git a/src/DurableTask.Core/OrchestrationRuntimeState.cs b/src/DurableTask.Core/OrchestrationRuntimeState.cs
index 494071dbd..11706b40b 100644
--- a/src/DurableTask.Core/OrchestrationRuntimeState.cs
+++ b/src/DurableTask.Core/OrchestrationRuntimeState.cs
@@ -97,6 +97,19 @@ public OrchestrationRuntimeState(IList? events)
}
}
+ ///
+ /// Returns a deep copy of the object.
+ ///
+ /// Cloned object
+ public OrchestrationRuntimeState Clone()
+ {
+ return new OrchestrationRuntimeState(this.Events)
+ {
+ Size = this.Size,
+ Status = this.Status,
+ };
+ }
+
///
/// Gets the execution started event
///
@@ -188,7 +201,7 @@ public OrchestrationStatus OrchestrationStatus
///
/// An invalid orchestration runtime state means that the history is somehow corrupted.
///
- public bool IsValid =>
+ public bool IsValid =>
this.Events.Count == 0 ||
this.Events.Count == 1 && this.Events[0].EventType == EventType.OrchestratorStarted ||
this.ExecutionStartedEvent != null;
@@ -253,8 +266,8 @@ bool IsDuplicateEvent(HistoryEvent historyEvent)
historyEvent.EventType == EventType.TaskCompleted &&
!completedEventIds.Add(historyEvent.EventId))
{
- TraceHelper.Trace(TraceEventType.Warning,
- "OrchestrationRuntimeState-DuplicateEvent",
+ TraceHelper.Trace(TraceEventType.Warning,
+ "OrchestrationRuntimeState-DuplicateEvent",
"The orchestration '{0}' has already seen a completed task with id {1}.",
this.OrchestrationInstance?.InstanceId ?? "",
historyEvent.EventId);
@@ -287,7 +300,7 @@ void SetMarkerEvents(HistoryEvent historyEvent)
// It's not generally expected to receive multiple execution completed events for a given orchestrator, but it's possible under certain race conditions.
// For example: when an orchestrator is signaled to terminate at the same time as it attempts to continue-as-new.
var log = $"Received new {completedEvent.GetType().Name} event despite the orchestration being already in the {orchestrationStatus} state.";
-
+
if (orchestrationStatus == OrchestrationStatus.ContinuedAsNew && completedEvent.OrchestrationStatus == OrchestrationStatus.Terminated)
{
// If the orchestration planned to continue-as-new but termination is requested, we transition to the terminated state.
diff --git a/src/DurableTask.Core/TaskOrchestrationDispatcher.cs b/src/DurableTask.Core/TaskOrchestrationDispatcher.cs
index 61864a1a5..75472c369 100644
--- a/src/DurableTask.Core/TaskOrchestrationDispatcher.cs
+++ b/src/DurableTask.Core/TaskOrchestrationDispatcher.cs
@@ -155,7 +155,7 @@ void EnsureExecutionStartedIsFirst(IList batch)
{
// Keep track of orchestrator generation changes, maybe update target position
string executionId = message.OrchestrationInstance.ExecutionId;
- if(previousExecutionId != executionId)
+ if (previousExecutionId != executionId)
{
// We want to re-position the ExecutionStarted event after the "right-most"
// event with a non-null executionID that came before it.
@@ -217,7 +217,7 @@ async Task OnProcessWorkItemSessionAsync(TaskOrchestrationWorkItem workItem)
CorrelationTraceClient.Propagate(
() =>
- {
+ {
// Check if it is extended session.
// TODO: Remove this code - it looks incorrect and dangerous
isExtendedSession = this.concurrentSessionLock.Acquire();
@@ -305,7 +305,7 @@ protected async Task OnProcessWorkItemAsync(TaskOrchestrationWorkItem work
var isCompleted = false;
var continuedAsNew = false;
var isInterrupted = false;
-
+
// correlation
CorrelationTraceClient.Propagate(() => CorrelationTraceContext.Current = workItem.TraceContext);
@@ -316,11 +316,10 @@ protected async Task OnProcessWorkItemAsync(TaskOrchestrationWorkItem work
workItem.OrchestrationRuntimeState.LogHelper = this.logHelper;
OrchestrationRuntimeState runtimeState = workItem.OrchestrationRuntimeState;
+ OrchestrationRuntimeState originalOrchestrationRuntimeState = runtimeState.Clone();
runtimeState.AddEvent(new OrchestratorStartedEvent(-1));
- OrchestrationRuntimeState originalOrchestrationRuntimeState = runtimeState;
-
// Distributed tracing support: each orchestration execution is a trace activity
// that derives from an established parent trace context. It is expected that some
// listener will receive these events and publish them to a distributed trace logger.
@@ -363,7 +362,6 @@ protected async Task OnProcessWorkItemAsync(TaskOrchestrationWorkItem work
continuedAsNew = false;
continuedAsNewMessage = null;
-
this.logHelper.OrchestrationExecuting(runtimeState.OrchestrationInstance!, runtimeState.Name);
TraceHelper.TraceInstance(
TraceEventType.Verbose,
@@ -600,7 +598,9 @@ protected async Task OnProcessWorkItemAsync(TaskOrchestrationWorkItem work
if (workItem.RestoreOriginalRuntimeStateDuringCompletion)
{
// some backends expect the original runtime state object
- workItem.OrchestrationRuntimeState = originalOrchestrationRuntimeState;
+ // NOTE: In a previous version of the code, originalOrchestrationRuntimeState was pointing to the same object/memory as runtimeState
+ // We use here runtimeState to preserve the original behavior.
+ workItem.OrchestrationRuntimeState = runtimeState;
}
runtimeState.Status = runtimeState.Status ?? carryOverStatus;
@@ -610,15 +610,44 @@ protected async Task OnProcessWorkItemAsync(TaskOrchestrationWorkItem work
instanceState.Status = runtimeState.Status;
}
- await this.orchestrationService.CompleteTaskOrchestrationWorkItemAsync(
- workItem,
- runtimeState,
- continuedAsNew ? null : messagesToSend,
- orchestratorMessages,
- continuedAsNew ? null : timerMessages,
- continuedAsNewMessage,
- instanceState);
-
+ try
+ {
+ await this.orchestrationService.CompleteTaskOrchestrationWorkItemAsync(
+ workItem,
+ runtimeState,
+ continuedAsNew ? null : messagesToSend,
+ orchestratorMessages,
+ continuedAsNew ? null : timerMessages,
+ continuedAsNewMessage,
+ instanceState);
+ }
+ catch (WorkItemPoisonedException poisonedException)
+ {
+ // The orchestration is poisoned and should be marked as failed
+ OrchestrationInstance instance = workItem.OrchestrationRuntimeState?.OrchestrationInstance ?? new OrchestrationInstance { InstanceId = workItem.InstanceId };
+ this.logHelper.OrchestrationPoisoned(instance, poisonedException.Message);
+ TraceHelper.TraceInstance(TraceEventType.Warning, "TaskOrchestrationDispatcher-ExecutionPoisoned", instance, "{0}", poisonedException.Message);
+
+ OrchestrationRuntimeState failedRuntimeState = originalOrchestrationRuntimeState.Clone();
+ failedRuntimeState.AddEvent(new OrchestratorStartedEvent(-1));
+ failedRuntimeState.AddEvent(new ExecutionCompletedEvent(
+ -1, poisonedException.Message, OrchestrationStatus.Failed, new FailureDetails(poisonedException)));
+ failedRuntimeState.AddEvent(new OrchestratorCompletedEvent(-1));
+
+ await this.orchestrationService.CompleteTaskOrchestrationWorkItemAsync(
+ workItem,
+ failedRuntimeState,
+ outboundMessages: Array.Empty(),
+ orchestratorMessages: Array.Empty(),
+ timerMessages: Array.Empty(),
+ continuedAsNewMessage: null,
+ instanceState);
+
+ isCompleted = false;
+ continuedAsNew = false;
+ isInterrupted = true;
+ }
+
if (workItem.RestoreOriginalRuntimeStateDuringCompletion)
{
workItem.OrchestrationRuntimeState = runtimeState;
@@ -1143,11 +1172,11 @@ TaskMessage ProcessSendEventDecision(
{
var historyEvent = new EventSentEvent(sendEventAction.Id)
{
- InstanceId = sendEventAction.Instance?.InstanceId,
- Name = sendEventAction.EventName,
- Input = sendEventAction.EventData
+ InstanceId = sendEventAction.Instance?.InstanceId,
+ Name = sendEventAction.EventName,
+ Input = sendEventAction.EventData
};
-
+
runtimeState.AddEvent(historyEvent);
EventRaisedEvent eventRaisedEvent = new EventRaisedEvent(-1, sendEventAction.EventData)
@@ -1169,7 +1198,7 @@ TaskMessage ProcessSendEventDecision(
Event = eventRaisedEvent
};
}
-
+
internal class NonBlockingCountdownLock
{
int available;