From aff4413e93fe78dc6e37ca6e76463596bdf02e0b Mon Sep 17 00:00:00 2001 From: Nathanael Mortensen Date: Tue, 25 Nov 2025 14:24:48 -0800 Subject: [PATCH] Associated DecisionFutures with the correct event loop Workflow state machines aren't processed while the event loop is active, so they have no mechanism to automatically associate with them. This is a correctness issue but it's unclear what the impacts are. --- .../workflow/statemachine/decision_manager.py | 24 ++++++++++++------- .../statemachine/decision_state_machine.py | 8 +++++-- cadence/_internal/workflow/workflow_engine.py | 8 +++---- .../_internal/workflow/workflow_intance.py | 7 ++++-- .../statemachine/test_decision_manager.py | 15 ++++++------ 5 files changed, 39 insertions(+), 23 deletions(-) diff --git a/cadence/_internal/workflow/statemachine/decision_manager.py b/cadence/_internal/workflow/statemachine/decision_manager.py index d811ee2..71f8b2e 100644 --- a/cadence/_internal/workflow/statemachine/decision_manager.py +++ b/cadence/_internal/workflow/statemachine/decision_manager.py @@ -1,6 +1,6 @@ import asyncio from collections import OrderedDict -from dataclasses import dataclass, field +from dataclasses import dataclass from typing import Dict, Type, Tuple, ClassVar, List from cadence._internal.workflow.statemachine.activity_state_machine import ( @@ -12,6 +12,7 @@ DecisionStateMachine, DecisionType, DecisionFuture, + T, ) from cadence._internal.workflow.statemachine.event_dispatcher import ( EventDispatcher, @@ -48,7 +49,6 @@ def _create_dispatch_map( return result -@dataclass class DecisionManager: """Aggregates multiple decision state machines and coordinates decisions. @@ -64,10 +64,13 @@ class DecisionManager: DecisionType.TIMER: timer_events, } ) - state_machines: OrderedDict[DecisionId, DecisionStateMachine] = field( - default_factory=OrderedDict - ) - aliases: Dict[DecisionAlias, DecisionStateMachine] = field(default_factory=dict) + + def __init__(self, event_loop: asyncio.AbstractEventLoop): + self._event_loop = event_loop + self.state_machines: OrderedDict[DecisionId, DecisionStateMachine] = ( + OrderedDict() + ) + self.aliases: Dict[DecisionAlias, DecisionStateMachine] = dict() # ----- Activity API ----- @@ -75,7 +78,7 @@ def schedule_activity( self, attrs: decision.ScheduleActivityTaskDecisionAttributes ) -> asyncio.Future[Payload]: decision_id = DecisionId(DecisionType.ACTIVITY, attrs.activity_id) - future = DecisionFuture[Payload](lambda: self._request_cancel(decision_id)) + future: DecisionFuture[Payload] = self._create_future(decision_id) machine = ActivityStateMachine(attrs, future) self._add_state_machine(machine) @@ -87,7 +90,7 @@ def start_timer( self, attrs: decision.StartTimerDecisionAttributes ) -> asyncio.Future[None]: decision_id = DecisionId(DecisionType.TIMER, attrs.timer_id) - future = DecisionFuture[None](lambda: self._request_cancel(decision_id)) + future: DecisionFuture[None] = self._create_future(decision_id) machine = TimerStateMachine(attrs, future) self._add_state_machine(machine) @@ -149,6 +152,11 @@ def collect_pending_decisions(self) -> List[decision.Decision]: return decisions + def _create_future(self, decision_id: DecisionId) -> DecisionFuture[T]: + return DecisionFuture[T]( + self._event_loop, lambda: self._request_cancel(decision_id) + ) + def _request_cancel(self, decision_id: DecisionId) -> bool: machine = self._get_machine(decision_id) # Interactions with the state machines should move them to the end so that the decisions are ordered as they diff --git a/cadence/_internal/workflow/statemachine/decision_state_machine.py b/cadence/_internal/workflow/statemachine/decision_state_machine.py index 4bcc2c9..5dfdb47 100644 --- a/cadence/_internal/workflow/statemachine/decision_state_machine.py +++ b/cadence/_internal/workflow/statemachine/decision_state_machine.py @@ -74,8 +74,12 @@ def state(self) -> DecisionState: class DecisionFuture(asyncio.Future[T]): - def __init__(self, request_cancel: CancelFn | None = None) -> None: - super().__init__() + def __init__( + self, + loop: asyncio.AbstractEventLoop | None = None, + request_cancel: CancelFn | None = None, + ) -> None: + super().__init__(loop=loop) if request_cancel is None: request_cancel = self.force_cancel self._request_cancel = request_cancel diff --git a/cadence/_internal/workflow/workflow_engine.py b/cadence/_internal/workflow/workflow_engine.py index ef9f0f0..f1ebefa 100644 --- a/cadence/_internal/workflow/workflow_engine.py +++ b/cadence/_internal/workflow/workflow_engine.py @@ -4,6 +4,7 @@ from cadence._internal.workflow.context import Context from cadence._internal.workflow.decision_events_iterator import DecisionEventsIterator +from cadence._internal.workflow.deterministic_event_loop import DeterministicEventLoop from cadence._internal.workflow.statemachine.decision_manager import DecisionManager from cadence._internal.workflow.workflow_intance import WorkflowInstance from cadence.api.v1.common_pb2 import Payload @@ -28,12 +29,11 @@ class DecisionResult: class WorkflowEngine: def __init__(self, info: WorkflowInfo, workflow_definition: WorkflowDefinition): + self._event_loop = DeterministicEventLoop() self._workflow_instance = WorkflowInstance( - workflow_definition, info.data_converter + self._event_loop, workflow_definition, info.data_converter ) - self._decision_manager = ( - DecisionManager() - ) # TODO: remove this stateful object and use the context instead + self._decision_manager = DecisionManager(self._event_loop) self._context = Context(info, self._decision_manager) def process_decision( diff --git a/cadence/_internal/workflow/workflow_intance.py b/cadence/_internal/workflow/workflow_intance.py index ae94780..82e3441 100644 --- a/cadence/_internal/workflow/workflow_intance.py +++ b/cadence/_internal/workflow/workflow_intance.py @@ -8,12 +8,15 @@ class WorkflowInstance: def __init__( - self, workflow_definition: WorkflowDefinition, data_converter: DataConverter + self, + loop: DeterministicEventLoop, + workflow_definition: WorkflowDefinition, + data_converter: DataConverter, ): + self._loop = loop self._definition = workflow_definition self._data_converter = data_converter self._instance = workflow_definition.cls() # construct a new workflow object - self._loop = DeterministicEventLoop() self._task: Optional[Task] = None def start(self, input: Payload): diff --git a/tests/cadence/_internal/workflow/statemachine/test_decision_manager.py b/tests/cadence/_internal/workflow/statemachine/test_decision_manager.py index d0d862b..ae8cfd1 100644 --- a/tests/cadence/_internal/workflow/statemachine/test_decision_manager.py +++ b/tests/cadence/_internal/workflow/statemachine/test_decision_manager.py @@ -1,3 +1,4 @@ +import asyncio from asyncio import CancelledError import pytest @@ -8,7 +9,7 @@ async def test_activity_dispatch(): - decisions = DecisionManager() + decisions = DecisionManager(asyncio.get_event_loop()) activity_result = decisions.schedule_activity( decision.ScheduleActivityTaskDecisionAttributes(activity_id="a") @@ -22,7 +23,7 @@ async def test_activity_dispatch(): async def test_simple_cancellation(): - decisions = DecisionManager() + decisions = DecisionManager(asyncio.get_event_loop()) activity_result = decisions.schedule_activity( decision.ScheduleActivityTaskDecisionAttributes(activity_id="a") @@ -34,7 +35,7 @@ async def test_simple_cancellation(): async def test_cancellation_not_immediate(): - decisions = DecisionManager() + decisions = DecisionManager(asyncio.get_event_loop()) activity_result = decisions.schedule_activity( decision.ScheduleActivityTaskDecisionAttributes(activity_id="a") @@ -47,7 +48,7 @@ async def test_cancellation_not_immediate(): async def test_cancellation_completed(): - decisions = DecisionManager() + decisions = DecisionManager(asyncio.get_event_loop()) activity_result = decisions.schedule_activity( decision.ScheduleActivityTaskDecisionAttributes(activity_id="a") @@ -78,7 +79,7 @@ async def test_cancellation_completed(): async def test_collect_decisions(): - decisions = DecisionManager() + decisions = DecisionManager(asyncio.get_event_loop()) activity1 = decisions.schedule_activity( decision.ScheduleActivityTaskDecisionAttributes(activity_id="a") @@ -105,7 +106,7 @@ async def test_collect_decisions(): async def test_collect_decisions_ignore_empty(): - decisions = DecisionManager() + decisions = DecisionManager(asyncio.get_event_loop()) _ = decisions.schedule_activity( decision.ScheduleActivityTaskDecisionAttributes(activity_id="a") @@ -117,7 +118,7 @@ async def test_collect_decisions_ignore_empty(): async def test_collection_decisions_reordering(): # Decisions should be emitted in the order that they happened within the workflow - decisions = DecisionManager() + decisions = DecisionManager(asyncio.get_event_loop()) activity1 = decisions.schedule_activity( decision.ScheduleActivityTaskDecisionAttributes(activity_id="a")