Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 16 additions & 8 deletions cadence/_internal/workflow/statemachine/decision_manager.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import asyncio
from collections import OrderedDict
from dataclasses import dataclass, field
from dataclasses import dataclass
from typing import Dict, Type, Tuple, ClassVar, List

from cadence._internal.workflow.statemachine.activity_state_machine import (
Expand All @@ -12,6 +12,7 @@
DecisionStateMachine,
DecisionType,
DecisionFuture,
T,
)
from cadence._internal.workflow.statemachine.event_dispatcher import (
EventDispatcher,
Expand Down Expand Up @@ -48,7 +49,6 @@ def _create_dispatch_map(
return result


@dataclass
class DecisionManager:
"""Aggregates multiple decision state machines and coordinates decisions.

Expand All @@ -64,18 +64,21 @@ class DecisionManager:
DecisionType.TIMER: timer_events,
}
)
state_machines: OrderedDict[DecisionId, DecisionStateMachine] = field(
default_factory=OrderedDict
)
aliases: Dict[DecisionAlias, DecisionStateMachine] = field(default_factory=dict)

def __init__(self, event_loop: asyncio.AbstractEventLoop):
self._event_loop = event_loop
self.state_machines: OrderedDict[DecisionId, DecisionStateMachine] = (
OrderedDict()
)
self.aliases: Dict[DecisionAlias, DecisionStateMachine] = dict()

# ----- Activity API -----

def schedule_activity(
self, attrs: decision.ScheduleActivityTaskDecisionAttributes
) -> asyncio.Future[Payload]:
decision_id = DecisionId(DecisionType.ACTIVITY, attrs.activity_id)
future = DecisionFuture[Payload](lambda: self._request_cancel(decision_id))
future: DecisionFuture[Payload] = self._create_future(decision_id)
machine = ActivityStateMachine(attrs, future)
self._add_state_machine(machine)

Expand All @@ -87,7 +90,7 @@ def start_timer(
self, attrs: decision.StartTimerDecisionAttributes
) -> asyncio.Future[None]:
decision_id = DecisionId(DecisionType.TIMER, attrs.timer_id)
future = DecisionFuture[None](lambda: self._request_cancel(decision_id))
future: DecisionFuture[None] = self._create_future(decision_id)
machine = TimerStateMachine(attrs, future)
self._add_state_machine(machine)

Expand Down Expand Up @@ -149,6 +152,11 @@ def collect_pending_decisions(self) -> List[decision.Decision]:

return decisions

def _create_future(self, decision_id: DecisionId) -> DecisionFuture[T]:
return DecisionFuture[T](
self._event_loop, lambda: self._request_cancel(decision_id)
)

def _request_cancel(self, decision_id: DecisionId) -> bool:
machine = self._get_machine(decision_id)
# Interactions with the state machines should move them to the end so that the decisions are ordered as they
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,12 @@ def state(self) -> DecisionState:


class DecisionFuture(asyncio.Future[T]):
def __init__(self, request_cancel: CancelFn | None = None) -> None:
super().__init__()
def __init__(
self,
loop: asyncio.AbstractEventLoop | None = None,
request_cancel: CancelFn | None = None,
) -> None:
super().__init__(loop=loop)
if request_cancel is None:
request_cancel = self.force_cancel
self._request_cancel = request_cancel
Expand Down
8 changes: 4 additions & 4 deletions cadence/_internal/workflow/workflow_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

from cadence._internal.workflow.context import Context
from cadence._internal.workflow.decision_events_iterator import DecisionEventsIterator
from cadence._internal.workflow.deterministic_event_loop import DeterministicEventLoop
from cadence._internal.workflow.statemachine.decision_manager import DecisionManager
from cadence._internal.workflow.workflow_intance import WorkflowInstance
from cadence.api.v1.common_pb2 import Payload
Expand All @@ -28,12 +29,11 @@ class DecisionResult:

class WorkflowEngine:
def __init__(self, info: WorkflowInfo, workflow_definition: WorkflowDefinition):
self._event_loop = DeterministicEventLoop()
self._workflow_instance = WorkflowInstance(
workflow_definition, info.data_converter
self._event_loop, workflow_definition, info.data_converter
)
self._decision_manager = (
DecisionManager()
) # TODO: remove this stateful object and use the context instead
self._decision_manager = DecisionManager(self._event_loop)
self._context = Context(info, self._decision_manager)

def process_decision(
Expand Down
7 changes: 5 additions & 2 deletions cadence/_internal/workflow/workflow_intance.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,15 @@

class WorkflowInstance:
def __init__(
self, workflow_definition: WorkflowDefinition, data_converter: DataConverter
self,
loop: DeterministicEventLoop,
workflow_definition: WorkflowDefinition,
data_converter: DataConverter,
):
self._loop = loop
self._definition = workflow_definition
self._data_converter = data_converter
self._instance = workflow_definition.cls() # construct a new workflow object
self._loop = DeterministicEventLoop()
self._task: Optional[Task] = None

def start(self, input: Payload):
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import asyncio
from asyncio import CancelledError

import pytest
Expand All @@ -8,7 +9,7 @@


async def test_activity_dispatch():
decisions = DecisionManager()
decisions = DecisionManager(asyncio.get_event_loop())

activity_result = decisions.schedule_activity(
decision.ScheduleActivityTaskDecisionAttributes(activity_id="a")
Expand All @@ -22,7 +23,7 @@ async def test_activity_dispatch():


async def test_simple_cancellation():
decisions = DecisionManager()
decisions = DecisionManager(asyncio.get_event_loop())

activity_result = decisions.schedule_activity(
decision.ScheduleActivityTaskDecisionAttributes(activity_id="a")
Expand All @@ -34,7 +35,7 @@ async def test_simple_cancellation():


async def test_cancellation_not_immediate():
decisions = DecisionManager()
decisions = DecisionManager(asyncio.get_event_loop())

activity_result = decisions.schedule_activity(
decision.ScheduleActivityTaskDecisionAttributes(activity_id="a")
Expand All @@ -47,7 +48,7 @@ async def test_cancellation_not_immediate():


async def test_cancellation_completed():
decisions = DecisionManager()
decisions = DecisionManager(asyncio.get_event_loop())

activity_result = decisions.schedule_activity(
decision.ScheduleActivityTaskDecisionAttributes(activity_id="a")
Expand Down Expand Up @@ -78,7 +79,7 @@ async def test_cancellation_completed():


async def test_collect_decisions():
decisions = DecisionManager()
decisions = DecisionManager(asyncio.get_event_loop())

activity1 = decisions.schedule_activity(
decision.ScheduleActivityTaskDecisionAttributes(activity_id="a")
Expand All @@ -105,7 +106,7 @@ async def test_collect_decisions():


async def test_collect_decisions_ignore_empty():
decisions = DecisionManager()
decisions = DecisionManager(asyncio.get_event_loop())

_ = decisions.schedule_activity(
decision.ScheduleActivityTaskDecisionAttributes(activity_id="a")
Expand All @@ -117,7 +118,7 @@ async def test_collect_decisions_ignore_empty():

async def test_collection_decisions_reordering():
# Decisions should be emitted in the order that they happened within the workflow
decisions = DecisionManager()
decisions = DecisionManager(asyncio.get_event_loop())

activity1 = decisions.schedule_activity(
decision.ScheduleActivityTaskDecisionAttributes(activity_id="a")
Expand Down