@@ -254,13 +254,8 @@ def run(self, graph, config, updatehash=False):
254254 num_jobs = len (self .pending_tasks )
255255 logger .debug ('Number of pending tasks: %d' % num_jobs )
256256 if num_jobs < self .max_jobs :
257- if np .isinf (self .max_jobs ):
258- slots = None
259- else :
260- slots = max (0 , self .max_jobs - num_jobs )
261- logger .debug ('Slots available: %s' % slots )
262257 self ._send_procs_to_workers (updatehash = updatehash ,
263- slots = slots , graph = graph )
258+ graph = graph )
264259 else :
265260 logger .debug ('Not submitting' )
266261 sleep (float (self ._config ['execution' ]['poll_sleep_duration' ]))
@@ -324,16 +319,21 @@ def _submit_mapnode(self, jobid):
324319 np .zeros (numnodes , dtype = bool )))
325320 return False
326321
327- def _send_procs_to_workers (self , updatehash = False , slots = None , graph = None ):
322+ def _send_procs_to_workers (self , updatehash = False , graph = None ):
328323 """ Sends jobs to workers
329324 """
330325 while np .any (self .proc_done == False ):
326+ num_jobs = len (self .pending_tasks )
327+ if np .isinf (self .max_jobs ):
328+ slots = None
329+ else :
330+ slots = max (0 , self .max_jobs - num_jobs )
331+ logger .debug ('Slots available: %s' % slots )
332+ if (num_jobs >= self .max_jobs ) or (slots == 0 ):
333+ break
331334 # Check to see if a job is available
332335 jobids = np .flatnonzero ((self .proc_done == False ) &
333336 (self .depidx .sum (axis = 0 ) == 0 ).__array__ ())
334- num_jobs = len (self .pending_tasks )
335- if num_jobs >= self .max_jobs :
336- break
337337 if len (jobids ) > 0 :
338338 # send all available jobs
339339 logger .info ('Submitting %d jobs' % len (jobids [:slots ]))
0 commit comments