Skip to content
This repository was archived by the owner on Apr 30, 2025. It is now read-only.

Commit 7178db1

Browse files
Add status property to WorkflowTaskImpl (#72)
1 parent 693168f commit 7178db1

1 file changed

Lines changed: 10 additions & 4 deletions

File tree

frinx/common/workflow/task.py

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,9 @@
1-
import typing
21
from enum import Enum
32
from typing import Any
43
from typing import Optional
54
from typing import TypeAlias
5+
from typing import Union
6+
from typing import cast
67

78
from pydantic import BaseModel
89
from pydantic import ConfigDict
@@ -14,6 +15,7 @@
1415

1516
from frinx.common.conductor_enums import DoWhileEvaluatorType
1617
from frinx.common.conductor_enums import SwitchEvaluatorType
18+
from frinx.common.conductor_enums import TaskResultStatus
1719
from frinx.common.conductor_enums import WorkflowStatus
1820
from frinx.common.util import snake_to_camel_case
1921
from frinx.common.worker.worker import WorkerImpl
@@ -93,6 +95,10 @@ class WorkflowTaskImpl(BaseModel):
9395
populate_by_name=True
9496
)
9597

98+
@property
99+
def status(self) -> TaskResultStatus:
100+
return cast(TaskResultStatus, f'${{{self.task_reference_name}.status}}')
101+
96102
def output_ref(self, path: str | None = None) -> str:
97103
if not path or path is None:
98104
return f'${{{self.task_reference_name}.output}}'
@@ -118,7 +124,7 @@ class DynamicForkTaskInputParameters(BaseModel):
118124

119125

120126
class DynamicForkTaskFromDefInputParameters(BaseModel):
121-
dynamic_tasks: typing.Union[str, object]
127+
dynamic_tasks: Union[str, object]
122128
dynamic_tasks_input: str
123129

124130
model_config = ConfigDict(
@@ -181,7 +187,7 @@ class DynamicForkTask(WorkflowTaskImpl):
181187
type: TaskType = TaskType.FORK_JOIN_DYNAMIC
182188
dynamic_fork_tasks_param: str = Field(default='dynamicTasks')
183189
dynamic_fork_tasks_input_param_name: str = Field(default='dynamicTasksInput')
184-
input_parameters: typing.Union[
190+
input_parameters: Union[
185191
DynamicForkArraysTaskInputParameters,
186192
DynamicForkTaskInputParameters,
187193
DynamicForkArraysTaskFromDefInputParameters, DynamicForkTaskFromDefInputParameters
@@ -336,7 +342,7 @@ def check_input_values(cls, values: dict[str, Any]) -> dict[str, Any]:
336342

337343

338344
class StartWorkflowTaskInputParameters(BaseModel):
339-
start_workflow: typing.Union[StartWorkflowTaskPlainInputParameters, StartWorkflowTaskFromDefInputParameters]
345+
start_workflow: Union[StartWorkflowTaskPlainInputParameters, StartWorkflowTaskFromDefInputParameters]
340346

341347
model_config = ConfigDict(
342348
validate_assignment=True,

0 commit comments

Comments
 (0)