Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 29 additions & 41 deletions oocana/oocana/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -434,6 +434,23 @@ async def oomol_token(self) -> str:
"""
return os.getenv("OOMOL_TOKEN", "")

def _wrap_output_with_warning(self, key: str, value: Any) -> tuple[Any, bool]:
"""
Wrap output value with error handling.

Args:
key: The output handle key
value: The value to wrap

Returns:
A tuple of (wrapped_value, success). If success is False, wrapped_value is None.
"""
try:
return self.__wrap_output_value(key, value), True
except (ValueError, IOError) as e:
self.send_warning(f"{e}")
return None, False

def output(self, key: str, value: Any, *, to_node: list[ToNode] | None = None, to_flow: list[ToFlow] | None = None):
"""
output the value to the next block
Expand All @@ -444,57 +461,35 @@ def output(self, key: str, value: Any, *, to_node: list[ToNode] | None = None, t
:param to_flow: list[ToFlow] | None, the target flow(with output handle) to send the output
if both to_node and to_flow are None, the output will be sent to all connected nodes and flows.
"""

try:
wrap_value = self.__wrap_output_value(key, value)
except ValueError as e:
self.send_warning(
f"{e}"
)
return
except IOError as e:
self.send_warning(
f"{e}"
)
wrap_value, success = self._wrap_output_with_warning(key, value)
if not success:
return

target = None
if to_node is not None or to_flow is not None:
target = {
"to_node": to_node,
"to_flow": to_flow,
}
else:
target = None
target = {"to_node": to_node, "to_flow": to_flow}

payload = {
payload: Dict[str, Any] = {
"type": "BlockOutput",
"handle": key,
"output": wrap_value,
}
if target is not None:
payload["options"] = {"target": target}
self.__mainframe.send(self.job_info, payload)

def outputs(self, outputs: Dict[str, Any]):
"""
output the value to the next block

map: Dict[str, Any], the key of the output, should be defined in the block schema output defs, the field name is handle
"""

values = {}
for key, value in outputs.items():
try:
wrap_value = self.__wrap_output_value(key, value)
wrap_value, success = self._wrap_output_with_warning(key, value)
if success:
values[key] = wrap_value
except ValueError as e:
self.send_warning(
f"{e}"
)
except IOError as e:
self.send_warning(
f"{e}"
)

self.__mainframe.send(self.job_info, {
"type": "BlockOutputs",
"outputs": values,
Expand All @@ -516,16 +511,9 @@ def finish(self, *, result: Dict[str, Any] | None = None, error: str | None = No
wrap_result = {}
if isinstance(result, dict):
for key, value in result.items():
try:
wrap_result[key] = self.__wrap_output_value(key, value)
except ValueError as e:
self.send_warning(
f"Output handle key: [{key}] is not defined in Block outputs schema. {e}"
)
except IOError as e:
self.send_warning(
f"Output handle key: [{key}] is not defined in Block outputs schema. {e}"
)
wrap_value, success = self._wrap_output_with_warning(key, value)
if success:
wrap_result[key] = wrap_value

self.__mainframe.send(self.job_info, {"type": "BlockFinished", "result": wrap_result})
else:
Expand Down