|
1 | 1 | import base64 |
2 | 2 | import logging |
3 | | -import multiprocessing |
4 | | -import multiprocessing.pool |
5 | | -import signal |
6 | | -import time |
7 | | -from multiprocessing.process import current_process |
8 | 3 | import os |
9 | 4 | from pathlib import Path |
10 | 5 | from typing import cast, Dict, List, Tuple, Optional |
|
34 | 29 | class EarlySystemExit(Exception): |
35 | 30 | """Wrapper for `SystemExit` exception. |
36 | 31 |
|
37 | | - To ensure that the worker process does not shutdown but rather handles the `SystemExit` as an |
| 32 | + To ensure that the worker process does not shut down but rather handles the `SystemExit` as an |
38 | 33 | error |
39 | 34 | """ |
40 | 35 |
|
41 | 36 | ... |
42 | 37 |
|
43 | 38 |
|
44 | | -def run_mesido(input_esdl: str) -> Tuple[Optional[str], List[EsdlMessage]]: |
45 | | - """Run mesido using the specific workflow. |
| 39 | +def grow_worker_task( |
| 40 | + input_esdl: str, workflow_config: ProtobufDict, update_progress_handler: UpdateProgressHandler |
| 41 | +) -> Tuple[Optional[str], List[EsdlMessage]]: |
| 42 | + """Run the grow worker task and run configured specific problem type for this worker instance. |
46 | 43 |
|
47 | | - Note: This is run without a subprocess! Casadi does not yield the GIL and therefore |
48 | | - causes starved thread issues. |
| 44 | + Note: Be careful! This spawns within a subprocess and gains a copy of memory from parent |
| 45 | + process. You cannot open sockets and other resources in the main process and expect |
| 46 | + it to be copied to subprocess. Any resources e.g. connections/sockets need to be opened |
| 47 | + in this task by the subprocess. |
49 | 48 |
|
50 | 49 | :param input_esdl: The input ESDL XML string. |
| 50 | + :param workflow_config: Extra parameters to configure this run. |
| 51 | + :param update_progress_handler: Handler to notify of any progress changes. |
51 | 52 | :return: GROW optimized or simulated ESDL and a list of ESDL feedback messages. |
52 | 53 | """ |
53 | 54 | mesido_func = get_problem_function(GROW_TASK_TYPE) |
@@ -77,7 +78,7 @@ def run_mesido(input_esdl: str) -> Tuple[Optional[str], List[EsdlMessage]]: |
77 | 78 | influxdb_password=os.environ.get("INFLUXDB_PASSWORD"), |
78 | 79 | influxdb_ssl=False, |
79 | 80 | influxdb_verify_ssl=False, |
80 | | - update_progress_function=None, |
| 81 | + update_progress_function=update_progress_handler, |
81 | 82 | profile_reader=InfluxDBProfileReader, |
82 | 83 | ) |
83 | 84 | esdl_str = cast(str, solution.optimized_esdl_string) |
@@ -121,70 +122,5 @@ def parse_mesido_esdl_messages( |
121 | 122 | return esdl_messages |
122 | 123 |
|
123 | 124 |
|
124 | | -def kill_pool(pool: multiprocessing.pool.Pool) -> None: |
125 | | - """Terminate all the process of a multiprocessing.Pool with SIGKILL. |
126 | | -
|
127 | | - Found here: https://stackoverflow.com/a/47580796 |
128 | | -
|
129 | | - multiprocessing.Pool.terminate does not provide a way to give a different signal than SIGTERM |
130 | | - so this function hooks into the internals to properly handle sending SIGKILL to all processes in |
131 | | - the pool. |
132 | | -
|
133 | | - :param pool: The multiprocessing to kill all processes in. |
134 | | - """ |
135 | | - # |
136 | | - # stop repopulating new child |
137 | | - pool._state = multiprocessing.pool.TERMINATE # type: ignore[attr-defined] |
138 | | - pool._worker_handler._state = multiprocessing.pool.TERMINATE # type: ignore[attr-defined] |
139 | | - for p in pool._pool: # type: ignore[attr-defined] |
140 | | - if p.is_alive(): |
141 | | - logger.warning("Sending SIGKILL to pool process with pid %s", p.pid) |
142 | | - os.kill(p.pid, signal.SIGKILL) |
143 | | - # .is_alive() will reap dead process |
144 | | - wait_till = time.time() + 5.0 |
145 | | - while ( |
146 | | - any(p.is_alive() for p in pool._pool) # type: ignore[attr-defined] |
147 | | - and time.time() < wait_till |
148 | | - ): |
149 | | - pass |
150 | | - logger.warning("All processes in pool have been terminated.") |
151 | | - pool.terminate() |
152 | | - logger.warning("Forceful pool termination completed.") |
153 | | - |
154 | | - |
155 | | -def grow_worker_task( |
156 | | - input_esdl: str, workflow_config: ProtobufDict, update_progress_handler: UpdateProgressHandler |
157 | | -) -> Tuple[Optional[str], List[EsdlMessage]]: |
158 | | - """Run the grow worker task and run configured specific problem type for this worker instance. |
159 | | -
|
160 | | - Note: Be careful! This spawns within a subprocess and gains a copy of memory from parent |
161 | | - process. You cannot open sockets and other resources in the main process and expect |
162 | | - it to be copied to subprocess. Any resources e.g. connections/sockets need to be opened |
163 | | - in this task by the subprocess. |
164 | | -
|
165 | | - :param input_esdl: The input ESDL XML string. |
166 | | - :param workflow_config: Extra parameters to configure this run. |
167 | | - :param update_progress_handler: Handler to notify of any progress changes. |
168 | | - :return: GROW optimized or simulated ESDL and a list of ESDL feedback messages. |
169 | | - """ |
170 | | - # TODO Very nasty hack. Celery unfortunately starts the worker subprocesses as 'daemons' |
171 | | - # which prevents this process from creating any other subprocesses. Therefore, we |
172 | | - # acknowledge this process is a daemon and turn of the protectioon that prevents new |
173 | | - # subprocesses from being created. This does introduce the issue that if this |
174 | | - # process is killed/cancelled/revoked, the subprocess will continue as a zombie process. |
175 | | - # See https://github.com/Project-OMOTES/optimizer-worker/issues/54 |
176 | | - current_process()._config["daemon"] = False # type: ignore[attr-defined] |
177 | | - |
178 | | - with multiprocessing.Pool(1) as pool: |
179 | | - try: |
180 | | - output = pool.map(run_mesido, [input_esdl])[0] |
181 | | - except SystemExit as e: |
182 | | - logger.warning("During pool the worker was requested to quit: %s %s", type(e), e) |
183 | | - kill_pool(pool) |
184 | | - raise |
185 | | - |
186 | | - return output |
187 | | - |
188 | | - |
189 | 125 | if __name__ == "__main__": |
190 | 126 | initialize_worker(GROW_TASK_TYPE.value, grow_worker_task) |
0 commit comments