Skip to content

Commit 70ebfa1

Browse files
Copilotberndverst
andcommitted
Add flexible entity dispatch with EntityBase class support
Co-authored-by: berndverst <4535280+berndverst@users.noreply.github.com>
1 parent 539eb29 commit 70ebfa1

File tree

5 files changed

+660
-3
lines changed

5 files changed

+660
-3
lines changed

durabletask/__init__.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
from durabletask.worker import ConcurrencyOptions
77
from durabletask.task import (
88
EntityContext, EntityState, EntityQuery, EntityQueryResult,
9-
EntityInstanceId, EntityOperationFailedException
9+
EntityInstanceId, EntityOperationFailedException, EntityBase, dispatch_to_entity_method
1010
)
1111

1212
__all__ = [
@@ -16,7 +16,9 @@
1616
"EntityQuery",
1717
"EntityQueryResult",
1818
"EntityInstanceId",
19-
"EntityOperationFailedException"
19+
"EntityOperationFailedException",
20+
"EntityBase",
21+
"dispatch_to_entity_method"
2022
]
2123

2224
PACKAGE_NAME = "durabletask"

durabletask/task.py

Lines changed: 130 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -691,6 +691,136 @@ def start_new_orchestration(self, orchestrator: Union[Orchestrator[TInput, TOutp
691691
# Activities are simple functions that can be scheduled by orchestrators
692692
Activity = Callable[[ActivityContext, TInput], TOutput]
693693

694+
class EntityBase:
695+
"""Base class for entity implementations that provides method-based dispatch.
696+
697+
This class allows entities to be implemented as classes with methods for each operation,
698+
similar to the .NET TaskEntity pattern. The entity context is automatically injected
699+
when methods are called.
700+
"""
701+
702+
def __init__(self):
703+
self._context: Optional[EntityContext] = None
704+
self._state: Optional[Any] = None
705+
706+
@property
707+
def context(self) -> EntityContext:
708+
"""Get the current entity context."""
709+
if self._context is None:
710+
raise RuntimeError("Entity context is not available outside of operation execution")
711+
return self._context
712+
713+
def get_state(self, state_type: type[T] = None) -> Optional[T]:
714+
"""Get the current state of the entity."""
715+
return self._state
716+
717+
def set_state(self, state: Any) -> None:
718+
"""Set the current state of the entity."""
719+
self._state = state
720+
721+
def signal_entity(self, entity_id: Union[str, EntityInstanceId], operation_name: str, *,
722+
input: Optional[Any] = None) -> None:
723+
"""Signal another entity with an operation."""
724+
if self._context:
725+
self._context.signal_entity(entity_id, operation_name, input=input)
726+
727+
def start_new_orchestration(self, orchestrator: Union[Orchestrator[TInput, TOutput], str], *,
728+
input: Optional[TInput] = None,
729+
instance_id: Optional[str] = None) -> str:
730+
"""Start a new orchestration from within an entity operation."""
731+
if self._context:
732+
return self._context.start_new_orchestration(orchestrator, input=input, instance_id=instance_id)
733+
return ""
734+
735+
736+
def dispatch_to_entity_method(entity_obj: Any, ctx: EntityContext, input: Any) -> Any:
737+
"""
738+
Dispatch an entity operation to the appropriate method on an entity object.
739+
740+
This function implements flexible method dispatch similar to the .NET implementation:
741+
1. Look for an exact method name match (case-insensitive)
742+
2. If the entity is an EntityBase subclass, inject context and state
743+
3. Handle method parameters automatically (context, input, or both)
744+
745+
Parameters
746+
----------
747+
entity_obj : Any
748+
The entity object to dispatch to
749+
ctx : EntityContext
750+
The entity context
751+
input : Any
752+
The operation input
753+
754+
Returns
755+
-------
756+
Any
757+
The result of the operation
758+
"""
759+
import inspect
760+
761+
# Set up entity base if applicable
762+
if isinstance(entity_obj, EntityBase):
763+
entity_obj._context = ctx
764+
entity_obj._state = ctx.get_state()
765+
766+
# Look for a method with the operation name (case-insensitive)
767+
operation_name = ctx.operation_name.lower()
768+
method = None
769+
770+
for attr_name in dir(entity_obj):
771+
if attr_name.lower() == operation_name and callable(getattr(entity_obj, attr_name)):
772+
method = getattr(entity_obj, attr_name)
773+
break
774+
775+
if method is None:
776+
raise NotImplementedError(f"Entity does not implement operation '{ctx.operation_name}'")
777+
778+
# Inspect method signature to determine parameters
779+
sig = inspect.signature(method)
780+
args = []
781+
kwargs = {}
782+
783+
# Skip 'self' parameter for bound methods
784+
parameters = list(sig.parameters.values())
785+
if parameters and parameters[0].name == 'self':
786+
parameters = parameters[1:]
787+
788+
for param in parameters:
789+
param_type = param.annotation
790+
791+
# Check for EntityContext parameter
792+
if param_type == EntityContext or param.name.lower() in ['context', 'ctx']:
793+
if param.kind == param.POSITIONAL_OR_KEYWORD:
794+
args.append(ctx)
795+
else:
796+
kwargs[param.name] = ctx
797+
# Check for input parameter
798+
elif param.name.lower() in ['input', 'data', 'arg', 'value']:
799+
if param.kind == param.POSITIONAL_OR_KEYWORD:
800+
args.append(input)
801+
else:
802+
kwargs[param.name] = input
803+
# Default positional parameter (assume it's input)
804+
elif param.kind == param.POSITIONAL_OR_KEYWORD and len(args) == 0:
805+
args.append(input)
806+
807+
try:
808+
result = method(*args, **kwargs)
809+
810+
# Update state if entity is EntityBase
811+
if isinstance(entity_obj, EntityBase):
812+
ctx.set_state(entity_obj._state)
813+
entity_obj._context = None # Clear context after operation
814+
815+
return result
816+
817+
except Exception as ex:
818+
# Clear context on error
819+
if isinstance(entity_obj, EntityBase):
820+
entity_obj._context = None
821+
raise
822+
823+
694824
# Entities are stateful objects that can receive operations and maintain state
695825
Entity = Callable[['EntityContext', TInput], TOutput]
696826

durabletask/worker.py

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1402,7 +1402,20 @@ def execute(self, req: pb.EntityBatchRequest) -> pb.EntityBatchResult:
14021402
operation_input = shared.from_json(operation.input.value) if not ph.is_empty(operation.input) else None
14031403

14041404
# Execute the entity operation
1405-
operation_output = fn(ctx, operation_input)
1405+
if callable(fn):
1406+
# Check if it's a class (entity base) or function
1407+
if inspect.isclass(fn):
1408+
# Instantiate the entity class
1409+
entity_instance = fn()
1410+
operation_output = task.dispatch_to_entity_method(entity_instance, ctx, operation_input)
1411+
elif hasattr(fn, '__call__') and not inspect.isfunction(fn):
1412+
# It's an instance of a class, use method dispatch
1413+
operation_output = task.dispatch_to_entity_method(fn, ctx, operation_input)
1414+
else:
1415+
# It's a regular function
1416+
operation_output = fn(ctx, operation_input)
1417+
else:
1418+
raise TypeError(f"Entity '{entity_type}' is not callable")
14061419

14071420
# Update state for next operation
14081421
current_state = ctx.get_state()

0 commit comments

Comments
 (0)