Skip to content

Commit f06b253

Browse files
authored
Merge pull request #87 from Project-OMOTES/86-propagating-informational-error-messages-from-workers-to-frontend
86 propagating informational error messages from workers to frontend
2 parents dd95ec2 + d779aaf commit f06b253

File tree

9 files changed

+137
-224
lines changed

9 files changed

+137
-224
lines changed

ci/linux/gen_protocol.sh

Lines changed: 0 additions & 12 deletions
This file was deleted.

pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ classifiers = [
2525

2626
dependencies = [
2727
"aio-pika ~= 9.4.2",
28-
"omotes-sdk-protocol ~= 0.1.6",
28+
"omotes-sdk-protocol ~= 0.1.13",
2929
"pyesdl ~= 24.2",
3030
"pamqp ~= 3.3.0",
3131
"celery ~= 5.3.6",
Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
from dataclasses import dataclass
2+
from enum import Enum
3+
from typing import Optional
4+
5+
from omotes_sdk_protocol.job_pb2 import EsdlMessage as EsdlMessagePb
6+
7+
8+
class MessageSeverity(Enum):
9+
"""Message severity options."""
10+
11+
DEBUG = "DEBUG"
12+
INFO = "INFO"
13+
WARNING = "WARNING"
14+
ERROR = "ERROR"
15+
16+
17+
@dataclass
18+
class EsdlMessage:
19+
"""Esdl feedback message, optionally related to a specific object (asset)."""
20+
21+
technical_message: str
22+
"""Technical message."""
23+
severity: MessageSeverity
24+
"""Message severity."""
25+
esdl_object_id: Optional[str] = None
26+
"""Optional esdl object id, None implies a general energy system message."""
27+
28+
def to_protobuf_message(self) -> EsdlMessagePb:
29+
"""Generate a protobuf message from this class.
30+
31+
:return: Protobuf message representation.
32+
"""
33+
return EsdlMessagePb(
34+
technical_message=self.technical_message,
35+
severity=EsdlMessagePb.Severity.Value(self.severity.value),
36+
esdl_object_id=self.esdl_object_id,
37+
)

src/omotes_sdk/internal/orchestrator_worker_events/messages/task_pb2.py

Lines changed: 0 additions & 50 deletions
This file was deleted.

src/omotes_sdk/internal/orchestrator_worker_events/messages/task_pb2.pyi

Lines changed: 0 additions & 105 deletions
This file was deleted.

src/omotes_sdk/internal/worker/worker.py

Lines changed: 48 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
import logging
33
import socket
44
import sys
5-
from typing import Callable, Dict, List, Any, Optional
5+
from typing import Callable, Dict, List, Any, Optional, Tuple
66
from uuid import UUID
77

88
import streamcapture
@@ -13,9 +13,10 @@
1313
from esdl import EnergySystem
1414
from esdl.esdl_handler import EnergySystemHandler
1515

16+
from omotes_sdk.internal.orchestrator_worker_events.esdl_messages import EsdlMessage
1617
from omotes_sdk.internal.worker.configs import WorkerConfig
1718
from omotes_sdk.internal.common.broker_interface import BrokerInterface
18-
from omotes_sdk.internal.orchestrator_worker_events.messages.task_pb2 import (
19+
from omotes_sdk_protocol.internal.task_pb2 import (
1920
TaskResult,
2021
TaskProgressUpdate,
2122
)
@@ -99,11 +100,13 @@ class WorkerTask(CeleryTask):
99100
broker_if: BrokerInterface
100101

101102
output_esdl: Optional[str]
103+
esdl_messages: List[EsdlMessage]
102104

103105
def __init__(self) -> None:
104106
"""Create the worker task."""
105107
super().__init__()
106108
self.output_esdl = None
109+
self.esdl_messages = []
107110

108111
def before_start(self, task_id: str, args: List[Any], kwargs: Dict[str, Any]) -> None:
109112
"""Runs before task start.
@@ -128,7 +131,7 @@ def after_return(
128131
kwargs: Dict[str, Any],
129132
einfo: str,
130133
) -> None:
131-
"""Runs after task start.
134+
"""Runs after task finished.
132135
133136
:param status: Task status.
134137
:param retval: Task return value.
@@ -143,13 +146,18 @@ def after_return(
143146
logs = self.logs.getvalue().decode()
144147
self.logs.close()
145148

149+
# to protobuf esdl messages:
150+
esdl_messages_pb = [
151+
esdl_message.to_protobuf_message() for esdl_message in self.esdl_messages
152+
]
153+
146154
job_id: UUID = args[0]
147155
job_reference: str = args[1]
148156

149157
result_message = None
150-
if status == "SUCCESS":
158+
if status == "FAILURE" or not self.output_esdl:
151159
logger.info(
152-
"Job %s (celery task id %s) with reference %s was successful.",
160+
"Job %s (celery task id %s) with reference %s failed.",
153161
job_id,
154162
self.request.id,
155163
job_reference,
@@ -158,14 +166,14 @@ def after_return(
158166
job_id=str(job_id),
159167
celery_task_id=self.request.id,
160168
celery_task_type=WORKER_TASK_TYPE,
161-
result_type=TaskResult.ResultType.SUCCEEDED,
162-
output_esdl=self.output_esdl,
169+
result_type=TaskResult.ResultType.ERROR,
170+
output_esdl="",
163171
logs=logs,
172+
esdl_messages=esdl_messages_pb,
164173
)
165-
166-
elif status == "FAILURE":
174+
elif status == "SUCCESS":
167175
logger.info(
168-
"Job %s (celery task id %s) with reference %s failed.",
176+
"Job %s (celery task id %s) with reference %s was successful.",
169177
job_id,
170178
self.request.id,
171179
job_reference,
@@ -174,9 +182,10 @@ def after_return(
174182
job_id=str(job_id),
175183
celery_task_id=self.request.id,
176184
celery_task_type=WORKER_TASK_TYPE,
177-
result_type=TaskResult.ResultType.ERROR,
178-
output_esdl="",
185+
result_type=TaskResult.ResultType.SUCCEEDED,
186+
output_esdl=self.output_esdl,
179187
logs=logs,
188+
esdl_messages=esdl_messages_pb,
180189
)
181190
else:
182191
logger.error(
@@ -263,21 +272,28 @@ def wrapped_worker_task(
263272
logger.info("Worker started new task %s with reference %s", job_id, job_reference)
264273
task_util = TaskUtil(job_id, task, task.broker_if)
265274
task_util.send_start()
266-
output_esdl = WORKER_TASK_FUNCTION(input_esdl, params_dict, task_util.update_progress)
267-
268-
input_esh = pyesdl_from_string(input_esdl)
269-
input_energy_system: EnergySystem = input_esh.energy_system
270-
if job_reference is None:
271-
new_name = f"{input_energy_system.name}_{WORKER_TASK_TYPE}"
272-
elif job_reference == "":
273-
new_name = f"{input_energy_system.name}"
275+
output_esdl, esdl_messages = WORKER_TASK_FUNCTION(
276+
input_esdl, params_dict, task_util.update_progress
277+
)
278+
279+
if output_esdl:
280+
input_esh = pyesdl_from_string(input_esdl)
281+
input_energy_system: EnergySystem = input_esh.energy_system
282+
if job_reference is None:
283+
new_name = f"{input_energy_system.name}_{WORKER_TASK_TYPE}"
284+
elif job_reference == "":
285+
new_name = f"{input_energy_system.name}"
286+
else:
287+
new_name = f"{input_energy_system.name}_{job_reference}"
288+
289+
output_esh = pyesdl_from_string(output_esdl)
290+
output_energy_system: EnergySystem = output_esh.energy_system
291+
output_energy_system.name = new_name
292+
task.output_esdl = output_esh.to_string()
274293
else:
275-
new_name = f"{input_energy_system.name}_{job_reference}"
294+
task.output_esdl = None
276295

277-
output_esh = pyesdl_from_string(output_esdl)
278-
output_energy_system: EnergySystem = output_esh.energy_system
279-
output_energy_system.name = new_name
280-
task.output_esdl = output_esh.to_string()
296+
task.esdl_messages = esdl_messages
281297

282298
task_util.update_progress(1.0, "Calculation finished.")
283299

@@ -345,7 +361,13 @@ def start(self) -> None:
345361

346362

347363
UpdateProgressHandler = Callable[[float, str], None]
348-
WorkerTaskF = Callable[[str, ProtobufDict, UpdateProgressHandler], str]
364+
WorkerTaskF = Callable[
365+
[str, ProtobufDict, UpdateProgressHandler],
366+
Tuple[
367+
Optional[str],
368+
List[EsdlMessage],
369+
],
370+
]
349371

350372
WORKER: Worker = None # type: ignore [assignment] # noqa
351373
WORKER_TASK_FUNCTION: WorkerTaskF = None # type: ignore [assignment] # noqa

0 commit comments

Comments
 (0)