Skip to content

Commit 2372e6c

Browse files
committed
feat: allow empty output workables
1 parent 3270b38 commit 2372e6c

2 files changed

Lines changed: 70 additions & 100 deletions

File tree

src/mpyflow/library/worker.py

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -56,12 +56,14 @@ def __init__(
5656
self,
5757
name: str,
5858
workable_in: tuple[Workable[IN, OT], ...],
59-
workable_out: tuple[Workable[OT, Any], ...],
59+
workable_out: tuple[Workable[OT, Any], ...] = tuple(),
6060
hub: bool = False,
6161
print_size: int = 10000,
6262
/,
6363
) -> None:
6464
super().__init__()
65+
if len(workable_in) == 0:
66+
raise WorkerEx("At least one workable must be defined as input!")
6567
self.__name = name
6668
self.__syo: SyncStdoutInterface | None = None
6769
self.__sha: RedirectSysHandler | None = None
@@ -229,6 +231,13 @@ async def __write_to_all(self, erg: OT, writer_len: int, /) -> None:
229231
pointers.remove(local_pointer)
230232
self.evq.worker_queue.task_done()
231233

234+
async def __write(self, erg: OT, wpl: list[int], writer_len: int, /) -> None:
235+
if len(self.__wm.workable_out) > 0:
236+
if self.__hub:
237+
await self.__write_to_all(erg, writer_len)
238+
else:
239+
await self.__write_to_one(erg, wpl, writer_len)
240+
232241
async def __writer(self, p_name: str, /) -> int:
233242
counter_write = 0
234243
wpl: list[int] = []
@@ -247,10 +256,8 @@ async def __writer(self, p_name: str, /) -> int:
247256
else:
248257
counter_write += 1
249258
self.__print_con(p_name, f"write_hub_{self.__hub}", counter_write)
250-
if self.__hub:
251-
await self.__write_to_all(erg, writer_len)
252-
else:
253-
await self.__write_to_one(erg, wpl, writer_len)
259+
await self.__write(erg, wpl, writer_len)
260+
254261
except Exception:
255262
self.evq.stop_event_error.set()
256263
raise

src/mpyflow/shared/logger/manager.py

Lines changed: 58 additions & 95 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,3 @@
1-
import atexit
2-
3-
from rich.status import Status
4-
51
import json
62
from io import TextIOWrapper
73

@@ -186,15 +182,11 @@ class _EndConD:
186182
flush_log: ValueP[int]
187183

188184

189-
def _print_at_exit(output_stream: TextIOWrapper | None, sta: Status, /) -> None:
185+
def _print_at_exit(output_stream: TextIOWrapper | None, /) -> None:
190186
if output_stream is not None:
191187
output_stream.write("\n")
192188
output_stream.flush()
193189
output_stream.close()
194-
try:
195-
sta.__exit__(None, None, None)
196-
except ValueError:
197-
pass
198190

199191

200192
def _prog_bar_calc(count: int, prog: float, /) -> str:
@@ -242,99 +234,70 @@ def __rich__(self) -> str:
242234
return self.__formatter()
243235

244236

245-
@final
246-
class _SysOutPrinter:
247-
__slots__ = ("__console", "__log_manager")
248-
249-
def __init__(self, manager: LogManager, /) -> None:
250-
super().__init__()
251-
self.__console = Console(log_path=False)
252-
sta = self.__console.status(_RichCast(lambda: self.__print_status()))
253-
sta.__enter__()
254-
atexit.register(lambda: _print_at_exit(sys.__stdout__, sta))
255-
self.__log_manager = manager
256-
257-
def __print_status(self) -> str:
258-
uptime_loc = self.__log_manager.wa_process_attributes.started
259-
with self.__log_manager.manager_lock:
260-
act_obj = len(self.__log_manager.sync_object_manager.managed_dict)
261-
if act_obj > 0:
262-
mana = self.__log_manager.sync_object_manager.managed_dict
263-
gen = (elem[0] for elem in mana.values())
264-
prog: float = reduce((lambda x, y: x + y), gen) / (10 * act_obj)
265-
else:
266-
prog = 0.0
267-
msg_st = _format_global_status(
268-
act_obj, self.__log_manager.manager_done_counter.value, uptime_loc, prog
269-
)
270-
return f"[bold green]{msg_st}"
271-
272-
def print_to_console(self, message: SyncOutMsg, /) -> None:
273-
with self.__log_manager.manager_lock:
274-
_check_message(message)
275-
new_act_object = self.__log_manager.sync_object_manager.managed_dict.get(
276-
message.s_id, None
277-
)
278-
if new_act_object is not None:
279-
_check_object_status(message, new_act_object)
280-
new_act_object = (message.status, message.done)
281-
if message.done:
282-
self.__log_manager.manager_done_counter.value += 1
283-
self.__log_manager.sync_object_manager.managed_dict[message.s_id] = (
284-
new_act_object
285-
)
286-
287-
for msg in message.msg.split("\n"):
288-
if msg == "":
289-
continue
290-
self.__console.log(msg)
237+
def _print_status(manager: LogManager, /) -> str:
238+
uptime_loc = manager.wa_process_attributes.started
239+
with manager.manager_lock:
240+
act_obj = len(manager.sync_object_manager.managed_dict)
241+
if act_obj > 0:
242+
mana = manager.sync_object_manager.managed_dict
243+
gen = (elem[0] for elem in mana.values())
244+
prog: float = reduce((lambda x, y: x + y), gen) / (10 * act_obj)
245+
else:
246+
prog = 0.0
247+
msg_st = _format_global_status(
248+
act_obj, manager.manager_done_counter.value, uptime_loc, prog
249+
)
250+
return f"[bold green]{msg_st}"
291251

292252

293-
def _print_empty(
294-
printer: _SysOutPrinter, pid_cur: int | None, manager: LogManager, /
295-
) -> None:
296-
printer.print_to_console(
297-
SyncOutMsg(
298-
s_id=f"_OutputWaitingProcess_{pid_cur!s}_{manager.id_cnt!s}",
299-
msg="",
300-
status=0.0,
301-
done=False,
302-
)
303-
)
253+
def _p2c(manager: LogManager, message: SyncOutMsg, console: Console, /) -> None:
254+
with manager.manager_lock:
255+
_check_message(message)
256+
new_act_object = manager.sync_object_manager.managed_dict.get(message.s_id, None)
257+
if new_act_object is not None:
258+
_check_object_status(message, new_act_object)
259+
new_act_object = (message.status, message.done)
260+
if message.done:
261+
manager.manager_done_counter.value += 1
262+
manager.sync_object_manager.managed_dict[message.s_id] = new_act_object
263+
if len(message.msg) > 5:
264+
console.log(message.msg)
265+
time.sleep(1)
304266

305267

306268
def _waiting_output_str(manager: LogManager, /) -> None:
307269
pid_cur = current_process().pid
308270
running = True
309-
printer = _SysOutPrinter(manager)
310-
_print_empty(printer, pid_cur, manager)
311-
try:
312-
while running:
313-
try:
314-
erg = manager.get_from_queue()
315-
except queue.Empty:
316-
_print_empty(printer, pid_cur, manager)
317-
else:
318-
printer.print_to_console(erg)
319-
running = manager.run or not manager.queue_empty
320-
out_msg = r"_OutputWaitingProcess \[done]"
321-
act_obj = len(manager.sync_object_manager.managed_dict) - 1
322-
act_obj -= manager.manager_done_counter.value
323-
if act_obj > 0:
324-
out_msg = f"{out_msg} unfinished objects: {act_obj}"
325-
printer.print_to_console(
326-
SyncOutMsg(
327-
s_id=f"_OutputWaitingProcess_{pid_cur!s}_{manager.id_cnt!s}",
328-
msg=out_msg,
329-
status=100.0,
330-
done=True,
331-
)
332-
)
333-
except Exception:
334-
manager.set_manager_error()
335-
raise
336-
finally:
337-
manager.zero_status()
271+
cons = Console(log_path=False)
272+
with cons.status(_RichCast(lambda: _print_status(manager))):
273+
s_id = f"_OutputWaitingProcess_{pid_cur!s}_{manager.id_cnt!s}"
274+
sync = SyncOutMsg(s_id=s_id, msg="", status=0.0, done=False)
275+
_p2c(manager, sync, cons)
276+
try:
277+
while running:
278+
try:
279+
erg = manager.get_from_queue()
280+
except queue.Empty:
281+
s_id = f"_OutputWaitingProcess_{pid_cur!s}_{manager.id_cnt!s}"
282+
sync = SyncOutMsg(s_id=s_id, msg="", status=0.0, done=False)
283+
_p2c(manager, sync, cons)
284+
else:
285+
_p2c(manager, erg, cons)
286+
running = manager.run or not manager.queue_empty
287+
out_msg = r"_OutputWaitingProcess \[done]"
288+
act_obj = len(manager.sync_object_manager.managed_dict) - 1
289+
act_obj -= manager.manager_done_counter.value
290+
if act_obj > 0:
291+
out_msg = f"{out_msg} unfinished objects: {act_obj}"
292+
s_id = f"_OutputWaitingProcess_{pid_cur!s}_{manager.id_cnt!s}"
293+
sync = SyncOutMsg(s_id=s_id, msg=out_msg, status=100.0, done=True)
294+
_p2c(manager, sync, cons)
295+
except Exception:
296+
manager.set_manager_error()
297+
raise
298+
finally:
299+
manager.zero_status()
300+
_print_at_exit(sys.__stdout__)
338301

339302

340303
def _logger_process(

0 commit comments

Comments
 (0)