1111
1212# Import packages
1313import os
14- from multiprocessing import Process , Pool , cpu_count , pool
14+ from multiprocessing import cpu_count
15+ from concurrent .futures import ProcessPoolExecutor
1516from traceback import format_exception
1617import sys
1718from logging import INFO
@@ -74,25 +75,6 @@ def run_node(node, updatehash, taskid):
7475 return result
7576
7677
77- class NonDaemonProcess (Process ):
78- """A non-daemon process to support internal multiprocessing.
79- """
80-
81- def _get_daemon (self ):
82- return False
83-
84- def _set_daemon (self , value ):
85- pass
86-
87- daemon = property (_get_daemon , _set_daemon )
88-
89-
90- class NonDaemonPool (pool .Pool ):
91- """A process pool with non-daemon processes.
92- """
93- Process = NonDaemonProcess
94-
95-
9678class MultiProcPlugin (DistributedPluginBase ):
9779 """
9880 Execute workflow with multiprocessing, not sending more jobs at once
@@ -139,8 +121,6 @@ def __init__(self, plugin_args=None):
139121 self ._cwd = os .getcwd ()
140122
141123 # Read in options or set defaults.
142- non_daemon = self .plugin_args .get ('non_daemon' , True )
143- maxtasks = self .plugin_args .get ('maxtasksperchild' , 10 )
144124 self .processors = self .plugin_args .get ('n_procs' , cpu_count ())
145125 self .memory_gb = self .plugin_args .get (
146126 'memory_gb' , # Allocate 90% of system memory
@@ -149,30 +129,19 @@ def __init__(self, plugin_args=None):
149129 True )
150130
151131 # Instantiate different thread pools for non-daemon processes
152- logger .debug ('[MultiProc] Starting in "%sdaemon" mode (n_procs=%d, '
153- 'mem_gb=%0.2f, cwd=%s)' , 'non' * int ( non_daemon ),
132+ logger .debug ('[MultiProc] Starting (n_procs=%d, '
133+ 'mem_gb=%0.2f, cwd=%s)' ,
154134 self .processors , self .memory_gb , self ._cwd )
155135
156- NipypePool = NonDaemonPool if non_daemon else Pool
157- try :
158- self .pool = NipypePool (
159- processes = self .processors ,
160- maxtasksperchild = maxtasks ,
161- initializer = os .chdir ,
162- initargs = (self ._cwd ,)
163- )
164- except TypeError :
165- # Python < 3.2 does not have maxtasksperchild
166- # When maxtasksperchild is not set, initializer is not to be
167- # called
168- self .pool = NipypePool (processes = self .processors )
136+ self .pool = ProcessPoolExecutor (max_workers = self .processors )
169137
170138 self ._stats = None
171139
172140 def _async_callback (self , args ):
173141 # Make sure runtime is not left at a dubious working directory
174142 os .chdir (self ._cwd )
175- self ._taskresult [args ['taskid' ]] = args
143+ result = args .result ()
144+ self ._taskresult [result ['taskid' ]] = result
176145
177146 def _get_result (self , taskid ):
178147 return self ._taskresult .get (taskid )
@@ -187,9 +156,9 @@ def _submit_job(self, node, updatehash=False):
187156 if getattr (node .interface , 'terminal_output' , '' ) == 'stream' :
188157 node .interface .terminal_output = 'allatonce'
189158
190- self . _task_obj [ self . _taskid ] = self .pool .apply_async (
191- run_node , ( node , updatehash , self ._taskid ),
192- callback = self ._async_callback )
159+ result_future = self .pool .submit ( run_node , node , updatehash , self . _taskid )
160+ result_future . add_done_callback ( self ._async_callback )
161+ self ._task_obj [ self . _taskid ] = result_future
193162
194163 logger .debug ('[MultiProc] Submitted task %s (taskid=%d).' ,
195164 node .fullname , self ._taskid )
@@ -218,7 +187,7 @@ def _prerun_check(self, graph):
218187 raise RuntimeError ('Insufficient resources available for job' )
219188
220189 def _postrun_check (self ):
221- self .pool .close ()
190+ self .pool .shutdown ()
222191
223192 def _check_resources (self , running_tasks ):
224193 """
0 commit comments